From 6dd45df33c9f3136bffa47bcabd5c092d219dfe7 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 24 Sep 2024 18:59:19 +0000 Subject: [PATCH] WIP adapt DW PR#2568 to use accumulo PR#4898 These draft changes build on #2568 with the following differences. * Compute bulkv2 load plans using new unreleased APIs in accumulo PR 4898 * The table splits are loaded at the beginning of writing to rfiles instead of at the end. Not sure about the overall implications on on memory use in reducers of this change. The load plan could be computed after the rfile is closed using a new API in 4898 if defering the loading of tablet splits is desired. * Switches to using accumulo public APIs for writing rfiles instaead of internal accumulo methods. Well public once they are actually released. * The algorithm to compute the load plan does less work per key/value. Should be rougly constant time vs log(N). * Adds a simple SortedList class. This reason this was added is that this code does binary searches on list, however it was not certain those list were actually sorted. If the list was not sorted it would not cause exceptions in binary search but could lead to incorrect load plans and lost data. This new SortedList class ensures list are sorted and allows this assurance to travel around in the code. Maybe this change should be its own PR. --- microservices/starters/datawave | 2 +- pom.xml | 2 +- .../job/BulkIngestMapFileLoader.java | 5 +- .../job/MultiRFileOutputFormatter.java | 265 +++--------------- .../ingest/mapreduce/job/SortedList.java | 77 +++++ .../ingest/mapreduce/job/SplitsFile.java | 4 +- .../mapreduce/job/TableSplitsCache.java | 22 +- .../partition/BalancedShardPartitioner.java | 9 +- .../partition/MultiTableRangePartitioner.java | 7 +- .../job/MultiRFileOutputFormatterTest.java | 58 ++-- .../mapreduce/job/TableSplitsCacheTest.java | 10 +- 11 files changed, 177 insertions(+), 284 deletions(-) create mode 100644 warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SortedList.java diff --git a/microservices/starters/datawave b/microservices/starters/datawave index 2baa5f42a43..27a6063348c 160000 --- a/microservices/starters/datawave +++ b/microservices/starters/datawave @@ -1 +1 @@ -Subproject commit 2baa5f42a4369ba583c359c5e2cbf8e38f1a59b6 +Subproject commit 27a6063348cadb190bfe15282c3d1444dfb5b2fa diff --git a/pom.xml b/pom.xml index dfd2d6ba3db..10770bef1d7 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ 11 UTF-8 1C - 2.1.2 + 2.1.4-97e4684860 1.4.1.Final 1.0.0.Final 3.20.2 diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java index b95a7a6a177..936e5174395 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java @@ -88,7 +88,6 @@ */ public final class BulkIngestMapFileLoader implements Runnable { private static final Logger log = Logger.getLogger(BulkIngestMapFileLoader.class); - private static final Gson gson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()).create(); private static int SLEEP_TIME = 30000; private static int FAILURE_SLEEP_TIME = 10 * 60 * 1000; // 10 minutes private static int MAX_DIRECTORIES = 1; @@ -1014,13 +1013,13 @@ private LoadPlan getLoadPlan() throws IOException { in.readFully(0, buffer); String s = new String(buffer, StandardCharsets.UTF_8); // TODO: Use Gson streaming api instead to minimize impact on heap and cpu - builder.addPlan(gson.fromJson(s, LoadPlan.class)); + builder.addPlan(LoadPlan.fromJson(s)); } } LoadPlan lp = builder.build(); log.debug("Completed deserializing load plan for " + tableDir); if (log.isTraceEnabled()) { - log.trace("Consolidated LoadPlan for " + tableDir + ": " + gson.toJson(lp)); + log.trace("Consolidated LoadPlan for " + tableDir + ": " + lp.toJson()); } return lp; } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java index 965d71eec09..c063c927bb2 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java @@ -3,27 +3,24 @@ import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.BULK_IMPORT_MODE_CONFIG; import static org.apache.accumulo.core.conf.Property.TABLE_CRYPTO_PREFIX; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; @@ -32,11 +29,9 @@ import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.LoadPlan; -import org.apache.accumulo.core.data.LoadPlan.RangeType; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoService; @@ -60,8 +55,6 @@ import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler; @@ -73,9 +66,6 @@ public class MultiRFileOutputFormatter extends FileOutputFormat writers = null; protected boolean loadPlanningEnabled = false; protected Map> loadPlans = null; @@ -264,7 +254,7 @@ protected void createAndRegisterWriter(String key, String table, Path filename, filename = insertFileCount(filename, count.intValue()); // now create and register the writer - SizeTrackingWriter writer = openWriter(filename.toString(), tableConf); + SizeTrackingWriter writer = openWriter(filename.toString(), tableConf, table); writer.startDefaultLocalityGroup(); writers.put(key, writer); unusedWriterPaths.put(key, filename); @@ -275,14 +265,20 @@ protected void createAndRegisterWriter(String key, String table, Path filename, } } - protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf) throws IOException { + protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf, String table) throws IOException { startWriteTime = System.currentTimeMillis(); // @formatter:off - CryptoService cs = CryptoFactoryLoader.getServiceForClient( - CryptoEnvironment.Scope.TABLE, tableConf.getAllCryptoProperties()); - return new SizeTrackingWriter( - FileOperations.getInstance().newWriterBuilder().forFile(filename, fs, conf, cs).withTableConfiguration(tableConf).build(), - this.loadPlanningEnabled); + + var builder = org.apache.accumulo.core.client.rfile.RFile.newWriter().to(filename).withFileSystem(fs).withTableProperties(tableConf); + if(this.loadPlanningEnabled) { + var splits = SplitsFile.getSplits(conf, table).get(); + if(splits != null && !splits.isEmpty()) { + LoadPlan.SplitResolver splitResolver = row->findContainingSplits(row, splits); + builder = builder.withSplitResolver(splitResolver); + } + } + + return new SizeTrackingWriter(builder.build()); // @formatter:on } @@ -304,10 +300,10 @@ protected void closeAndUpdateWriter(String key) throws IOException, AccumuloExce Path file = usedWriterPaths.get(key); // don't bother if this writer has not been used yet if (file != null) { - if (writer.isLoadPlanning()) { - computeLoadPlan(writer, table, file); - } writer.close(); + if (loadPlanningEnabled) { + addLoadPlan(table, writer.getLoadPlan(file.getName())); + } // pull the index off the filename file = removeFileCount(file); createAndRegisterWriter(key, table, file, tableConfigs.get(table)); @@ -315,29 +311,8 @@ protected void closeAndUpdateWriter(String key) throws IOException, AccumuloExce } } - /** - * Init table's LoadPlan list and compute the plan for the given rfile asynchronously - * - * @param writer - * RFile writer - * @param tableName - * table name - * @param rfile - * rfile path - */ - protected void computeLoadPlan(SizeTrackingWriter writer, String tableName, Path rfile) { - if (!loadPlans.containsKey(tableName)) { - loadPlans.put(tableName, new LinkedList<>()); - } - // @formatter:off - loadPlanFutures.add(CompletableFuture - .supplyAsync(() -> compute(rfile, writer.rows, tableName, conf)) - .thenAccept(plan -> { - if (plan != null) { - loadPlans.get(tableName).add(plan); - } - })); - // @formatter:on + private void addLoadPlan(String tableName, LoadPlan loadPlan) { + loadPlans.computeIfAbsent(tableName, t -> new ArrayList<>()).add(loadPlan); } protected void commitLoadPlans(TaskAttemptContext context) { @@ -362,12 +337,11 @@ private void writeLoadPlans(TaskAttemptContext context) { var table = entry.getKey(); var path = new Path(String.format("%s/%s", workDir, table), getUniqueFile(context, "loadplan", ".json")); var loadPlan = builder.build(); - // TODO: Use Gson streaming api instead (JsonWriter) to minimize impact on heap //@formatter:off futures.add(CompletableFuture.runAsync(() -> { try (FSDataOutputStream out = fs.create(path)) { log.debug("Begin writing load plan for " + path); - out.write(gson.toJson(loadPlan).getBytes(StandardCharsets.UTF_8)); + out.write(loadPlan.toJson().getBytes(StandardCharsets.UTF_8)); log.debug("Completed writing load plan for " + path); } catch (IOException ioe) { log.error("Failed to write plan for " + path, ioe); @@ -387,67 +361,7 @@ private void writeLoadPlans(TaskAttemptContext context) { } /** - * Creates a {@link LoadPlan} for the given RFile by mapping its row values to the relevant KeyExtents from the given table. - * - * @param rfile - * RFile path - * @param rfileRows - * Set of rows contained in the RFile - * @param tableName - * Table whose splits are to be examined to create the mapping - * @param conf - * The configuration required to retrieve table splits - * @return LoadPlan for the RFile - */ - static LoadPlan compute(Path rfile, SortedSet rfileRows, String tableName, Configuration conf) { - if (rfileRows != null && !rfileRows.isEmpty()) { - try { - var splits = SplitsFile.getSplits(conf, tableName); - return compute(rfile, rfileRows, splits); - } catch (Exception e) { - throw new RuntimeException("Failed to retrieve splits!", e); - } - } - return null; - } - - /** - * Creates a {@link LoadPlan} for the given RFile by mapping its row values to the relevant KeyExtents from the given table. - * - * @param rfile - * RFile path - * @param rfileRows - * Set of rows contained in the RFile - * @param tableSplits - * Splits for the table being targeted - * @return LoadPlan for the RFile. - */ - static LoadPlan compute(Path rfile, SortedSet rfileRows, List tableSplits) { - // @formatter:off - var builder = LoadPlan.builder(); - if (tableSplits == null || tableSplits.isEmpty()) { - // RFile examination and mapping handled later by bulk import client - log.debug("Calculating FILE load plan for " + rfile); - var ke = new KeyExtent(rfileRows.first(), rfileRows.last()); - builder.addPlan(LoadPlan.builder().loadFileTo( - rfile.getName(), RangeType.FILE, ke.prevEndRow, ke.endRow).build()); - } else { - // Compute extent mapping so that we can skip examination at load time - log.debug("Calculating TABLE load plan for " + rfile); - rfileRows.stream() - .map(row -> findKeyExtent(row, tableSplits)) - .collect(Collectors.toCollection(HashSet::new)) - .forEach(ke -> builder.addPlan(LoadPlan.builder().loadFileTo( - rfile.getName(), RangeType.TABLE, ke.prevEndRow, ke.endRow).build()) - ); - log.debug("Table load plan completed for file: " + rfile); - } - // @formatter:on - return builder.build(); - } - - /** - * Finds the KeyExtent where the specified row should reside + * Finds two contiguous table splits that contain the specified row should reside * * @param lookupRow * Row value to be mapped @@ -455,51 +369,20 @@ static LoadPlan compute(Path rfile, SortedSet rfileRows, List tableS * Splits for the table in question * @return KeyExtent mapping for the given row */ - static KeyExtent findKeyExtent(Text lookupRow, List tableSplits) { - var ke = new KeyExtent(); - var ceilingIdx = findCeiling(lookupRow, tableSplits); - if (ceilingIdx == -1) { - // Last tablet (endRow remains null) - ke.prevEndRow = tableSplits.get(tableSplits.size() - 1); - } else { - ke.endRow = tableSplits.get(ceilingIdx); - // If ceiling == 0, then first tablet (prevEndRow remains null) - if (ceilingIdx > 0) { - ke.prevEndRow = tableSplits.get(ceilingIdx - 1); - } + static LoadPlan.TableSplits findContainingSplits(Text lookupRow, List tableSplits) { + int position = Collections.binarySearch(tableSplits, lookupRow); + if (position < 0) { + position = -1 * (position + 1); } - return ke; - } - /** - * Performs binary search on tableSplits to find the index of the first (least) split ≥ lookupRow - * - * @param lookupRow - * Row for which we want to find the ceiling - * @param tableSplits - * Sorted table tableSplits list whose implementation is assumed to provide fast random access (i.e., {@link java.util.RandomAccess}) - * @return index of the first split ≥ lookupRow, or -1 if (lookupRow > tableSplits.get(tableSplits.size()-1) - */ - static int findCeiling(Text lookupRow, List tableSplits) { - int begin = 0; - int end = tableSplits.size() - 1; - int ceiling = -1; - while (begin <= end) { - int middle = (begin + end) / 2; - if (tableSplits.get(middle).compareTo(lookupRow) >= 0) { - end = middle - 1; - ceiling = middle; - } else { - begin = middle + 1; - } - } - return ceiling; + Text prevRow = position == 0 ? null : tableSplits.get(position - 1); + Text endRow = position == tableSplits.size() ? null : tableSplits.get(position); + + return new LoadPlan.TableSplits(prevRow, endRow); } - public static class SizeTrackingWriter implements FileSKVWriter { - private FileSKVWriter delegate; - private boolean loadPlanning; - SortedSet rows; + public static class SizeTrackingWriter { + private RFileWriter delegate; long size = 0; int entries = 0; @@ -511,12 +394,9 @@ public int getNumEntries() { return entries; } - public boolean supportsLocalityGroups() { - return delegate.supportsLocalityGroups(); - } - public void startNewLocalityGroup(String name, Set columnFamilies) throws IOException { - delegate.startNewLocalityGroup(name, columnFamilies); + var fams = columnFamilies.stream().map(bs -> bs.toArray()).collect(Collectors.toList()); + delegate.startNewLocalityGroup(name, fams); } public void startDefaultLocalityGroup() throws IOException { @@ -527,34 +407,22 @@ public void append(Key key, Value value) throws IOException { entries++; size += key.getLength() + (value == null ? 0 : value.getSize()); delegate.append(key, value); - if (loadPlanning) { - rows.add(key.getRow()); - } - } - - public DataOutputStream createMetaStore(String name) throws IOException { - return delegate.createMetaStore(name); } public void close() throws IOException { delegate.close(); } - @Override public long getLength() throws IOException { return getSize(); } - public boolean isLoadPlanning() { - return loadPlanning; + public LoadPlan getLoadPlan(String filename) { + return delegate.getLoadPlan(filename); } - public SizeTrackingWriter(FileSKVWriter delegate, boolean loadPlanning) { + public SizeTrackingWriter(RFileWriter delegate) { this.delegate = delegate; - this.loadPlanning = loadPlanning; - if (this.loadPlanning) { - rows = new TreeSet<>(); - } } } @@ -797,12 +665,13 @@ public void write(BulkIngestKey key, Value value) throws IOException { public void close(TaskAttemptContext context) throws IOException, InterruptedException { for (Map.Entry entry : writers.entrySet()) { var writer = entry.getValue(); - if (writer.isLoadPlanning()) { + writer.close(); + if (loadPlanningEnabled) { var tableName = writerTableNames.get(entry.getKey()); var rfilePath = usedWriterPaths.get(entry.getKey()); - computeLoadPlan(writer, tableName, rfilePath); + addLoadPlan(tableName, writer.getLoadPlan(rfilePath.getName())); } - writer.close(); + } if (loadPlanningEnabled) { @@ -921,56 +790,4 @@ protected Map getShardLocations(String tableName) throws IOExceptio return tableShardLocations.get(tableName); } - - /** - * Simplified representation of an Accumulo KeyExtent, used here to track mapped tablets during LoadPlan creation (and to avoid yet another Accumulo - * "disallowed import") - */ - static class KeyExtent implements Comparable { - Text prevEndRow = null; - Text endRow = null; - - KeyExtent(Text prevEndRow, Text endRow) { - this.prevEndRow = prevEndRow; - this.endRow = endRow; - } - - KeyExtent() {} - - private static final Comparator COMPARATOR = Comparator.comparing(KeyExtent::endRow, Comparator.nullsLast(Text::compareTo)) - .thenComparing(KeyExtent::prevEndRow, Comparator.nullsFirst(Text::compareTo)); - - public Text endRow() { - return endRow; - } - - public Text prevEndRow() { - return prevEndRow; - } - - @Override - public int compareTo(KeyExtent other) { - return COMPARATOR.compare(this, other); - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - KeyExtent keyExtent = (KeyExtent) o; - return Objects.equals(endRow, keyExtent.endRow) && Objects.equals(prevEndRow, keyExtent.prevEndRow); - } - - @Override - public int hashCode() { - return Objects.hash(endRow, prevEndRow); - } - - @Override - public String toString() { - return (prevEndRow == null ? "null" : prevEndRow.toString()) + ";" + (endRow == null ? "null" : endRow.toString()); - } - } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SortedList.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SortedList.java new file mode 100644 index 00000000000..41703ec13da --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SortedList.java @@ -0,0 +1,77 @@ +package datawave.ingest.mapreduce.job; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import org.apache.log4j.Logger; + +/** + * Wraps a list that is immutable and verified as sorted. + */ +public class SortedList { + + private static final Logger log = Logger.getLogger(SortedList.class); + + private final List list; + + private SortedList(List list) { + this.list = list; + } + + public List get() { + return list; + } + + private static final SortedList EMPTY = new SortedList<>(List.of()); + + @SuppressWarnings("unchecked") + public static SortedList empty() { + return (SortedList) EMPTY; + } + + /** + * For a list that is expected to be sorted this will verify it is sorted and if so return an immutable copy of it. If this list is not sorted it will log a + * warning, copy it, sort the copy, and return an immutable version of the copy. + */ + public static SortedList fromSorted(List list) { + if (list.isEmpty()) { + return empty(); + } + + var copy = List.copyOf(list); + + // verify after copying because nothing can change at this point + boolean isSorted = true; + for (int i = 1; i < copy.size(); i++) { + @SuppressWarnings("unchecked") + var prev = (Comparable) copy.get(i - 1); + if (prev.compareTo(copy.get(i)) > 0) { + isSorted = false; + } + } + + if (isSorted) { + return new SortedList<>(copy); + } else { + log.warn("Input list of size " + copy.size() + " was expected to be sorted but was not", new IllegalArgumentException()); + return fromUnsorted(copy); + } + } + + /** + * Copies a list and sorts the copy returning an immutable version of the copy. + */ + public static SortedList fromUnsorted(List list) { + if (list.isEmpty()) { + return empty(); + } + + var copy = new ArrayList<>(list); + @SuppressWarnings("unchecked") + var compartor = (Comparator) Comparator.naturalOrder(); + list.sort(compartor); + return new SortedList<>(Collections.unmodifiableList(copy)); + } +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java index 28283f940b9..00a84f088d5 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java @@ -186,7 +186,7 @@ private static boolean shardsAreBalanced(Map locations, String date return dateIsBalanced; } - public static Map> getSplits(Configuration conf) throws IOException { + public static Map> getSplits(Configuration conf) throws IOException { return TableSplitsCache.getCurrentCache(conf).getSplits(); } @@ -195,7 +195,7 @@ public static Map getSplitsAndLocations(Configuration conf, String return TableSplitsCache.getCurrentCache(conf).getSplitsAndLocationByTable(tableName); } - public static List getSplits(Configuration conf, String tableName) throws IOException { + public static SortedList getSplits(Configuration conf, String tableName) throws IOException { return TableSplitsCache.getCurrentCache(conf).getSplits(tableName); } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java index ab901845675..32337c3cfa5 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java @@ -69,7 +69,7 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil { private Path splitsPath = null; private Map> splitLocations = new HashMap<>(); - private Map> splits = new HashMap<>(); + private Map> splits = new HashMap<>(); private PartitionerCache partitionerCache; @@ -116,6 +116,7 @@ public static void clear() { * maximum number of splits to return * @return split points grouped into fewer evenly grouped splits so as not to exceed maxSplits */ + // TODO seems like this code is only used in test public static List trimSplits(List tableSplits, int maxSplits) { if (tableSplits.size() <= maxSplits) { return tableSplits; @@ -363,6 +364,7 @@ protected void readCache(BufferedReader in) throws IOException { String line; String tableName = null; Map tmpSplitLocations = new ShardLocationTrieMap(); + Map> tableSplits = new HashMap<>(); List tmpSplits = null; while ((line = in.readLine()) != null) { @@ -374,7 +376,7 @@ protected void readCache(BufferedReader in) throws IOException { tableName = parts[0]; tmpSplitLocations = new ShardLocationTrieMap(); tmpSplits = new ArrayList<>(); - this.splits.put(tableName, Collections.unmodifiableList(tmpSplits)); + tableSplits.put(tableName, tmpSplits); } if (parts.length >= 2) { Text split = new Text(Base64.decodeBase64(parts[1])); @@ -387,6 +389,11 @@ protected void readCache(BufferedReader in) throws IOException { if (!tmpSplitLocations.isEmpty()) { this.splitLocations.put(tableName, tmpSplitLocations); } + + tableSplits.forEach((table, splits) -> { + this.splits.put(table, SortedList.fromSorted(splits)); + }); + in.close(); } @@ -407,12 +414,12 @@ private String dedup(Map dedupMap, String value) { * @throws IOException * for issues with read or write */ - public List getSplits(String table) throws IOException { + public SortedList getSplits(String table) throws IOException { if (this.splits.isEmpty()) { read(); } - List splitList = this.splits.get(table); - return (splitList == null ? Collections.emptyList() : splitList); + SortedList splitList = this.splits.get(table); + return (splitList == null ? SortedList.empty() : splitList); } /** @@ -425,8 +432,9 @@ public List getSplits(String table) throws IOException { * @throws IOException * for issues with read or write */ + // TODO seems like this code is only used in test, can it be removed? Looked into making it return SortedList public List getSplits(String table, int maxSplits) throws IOException { - return trimSplits(getSplits(table), maxSplits); + return trimSplits(getSplits(table).get(), maxSplits); } /** @@ -434,7 +442,7 @@ public List getSplits(String table, int maxSplits) throws IOException { * @throws IOException * for issues with read or write */ - public Map> getSplits() throws IOException { + public Map> getSplits() throws IOException { if (this.splits.isEmpty()) read(); return Collections.unmodifiableMap(splits); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java index 4eb8b2b43fe..f09618a0585 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java @@ -23,6 +23,7 @@ import datawave.ingest.mapreduce.handler.shard.ShardIdFactory; import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler; import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.SortedList; import datawave.ingest.mapreduce.job.SplitsFile; import datawave.util.time.DateHelper; @@ -134,7 +135,7 @@ private HashMap getPartitionsByShardId(String tableName) throws IO if (log.isDebugEnabled()) log.debug("Loading splits data for " + tableName); - List sortedSplits = SplitsFile.getSplits(conf, tableName); + SortedList sortedSplits = SplitsFile.getSplits(conf, tableName); Map shardIdToLocation = SplitsFile.getSplitsAndLocations(conf, tableName); if (log.isDebugEnabled()) @@ -158,7 +159,7 @@ private HashMap getPartitionsByShardId(String tableName) throws IO * the map of shard ids and their location * @return shardId to */ - private HashMap assignPartitionsForEachShard(List sortedShardIds, Map shardIdToLocations) { + private HashMap assignPartitionsForEachShard(SortedList sortedShardIds, Map shardIdToLocations) { int totalNumUniqueTServers = calculateNumberOfUniqueTservers(shardIdToLocations); HashMap partitionsByTServer = getTServerAssignments(totalNumUniqueTServers, sortedShardIds, shardIdToLocations); @@ -178,11 +179,11 @@ private int calculateNumberOfUniqueTservers(Map shardIdToLocations) return totalNumUniqueTServers; } - private HashMap getTServerAssignments(int totalNumTServers, List sortedShardIds, Map shardIdsToTservers) { + private HashMap getTServerAssignments(int totalNumTServers, SortedList sortedShardIds, Map shardIdsToTservers) { HashMap partitionsByTServer = new HashMap<>(totalNumTServers); int nextAvailableSlot = 0; boolean alreadySkippedFutureShards = false; - for (Text shard : sortedShardIds) { + for (Text shard : sortedShardIds.get()) { if (alreadySkippedFutureShards || !isFutureShard(shard)) { // short circuiting for performance alreadySkippedFutureShards = true; String location = shardIdsToTservers.get(shard); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java index dba0e0547fd..5cee4854234 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java @@ -18,6 +18,7 @@ import org.apache.log4j.Logger; import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.SortedList; import datawave.ingest.mapreduce.job.SplitsFile; /** @@ -82,7 +83,7 @@ public int getPartition(BulkIngestKey key, Value value, int numPartitions) { String tableName = key.getTableName().toString(); - List cutPointArray = null; + SortedList cutPointArray = null; try { cutPointArray = SplitsFile.getSplits(conf, tableName); } catch (IOException e) { @@ -92,8 +93,8 @@ public int getPartition(BulkIngestKey key, Value value, int numPartitions) { return (tableName.hashCode() & Integer.MAX_VALUE) % numPartitions; } key.getKey().getRow(holder); - int index = Collections.binarySearch(cutPointArray, holder); - index = calculateIndex(index, numPartitions, tableName, cutPointArray.size()); + int index = Collections.binarySearch(cutPointArray.get(), holder); + index = calculateIndex(index, numPartitions, tableName, cutPointArray.get().size()); index = partitionLimiter.limit(numPartitions, index); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java index be7ea9df7c4..68ac0368e31 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java @@ -1,7 +1,7 @@ package datawave.ingest.mapreduce.job; import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.BULK_IMPORT_MODE_CONFIG; -import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findKeyExtent; +import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findContainingSplits; import static org.junit.Assert.assertEquals; import java.io.DataOutputStream; @@ -28,7 +28,6 @@ import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.RecordWriter; @@ -49,7 +48,6 @@ import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.ImportMode; -import datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.KeyExtent; import datawave.util.TableName; public class MultiRFileOutputFormatterTest { @@ -235,19 +233,20 @@ public void testPlanning() { rfileRows.add(new Text("20170601_9")); rfileRows.add(new Text("20200601_9")); - Set expectedExtents = new HashSet<>(); - expectedExtents.add(new KeyExtent(new Text("20170601_0"), new Text("20170601_1"))); - expectedExtents.add(new KeyExtent(new Text("20170601_8"), new Text("20170601_9"))); - expectedExtents.add(new KeyExtent(new Text("20170602_0"), new Text("20170602_1"))); - expectedExtents.add(new KeyExtent(new Text("20170603_9"), null)); - expectedExtents.add(new KeyExtent(new Text("20170603_0c"), new Text("20170603_1"))); - expectedExtents.add(new KeyExtent(null, new Text("20170601_0"))); - expectedExtents.add(new KeyExtent(new Text("20170602_9c"), new Text("20170603_0"))); - expectedExtents.add(new KeyExtent(new Text("20170603_0a"), new Text("20170603_0b"))); - expectedExtents.add(new KeyExtent(new Text("20170603_0b"), new Text("20170603_0c"))); + Set expectedExtents = new HashSet<>(); + expectedExtents.add(new LoadPlan.TableSplits(new Text("20170601_0"), new Text("20170601_1"))); + expectedExtents.add(new LoadPlan.TableSplits(new Text("20170601_8"), new Text("20170601_9"))); + expectedExtents.add(new LoadPlan.TableSplits(new Text("20170602_0"), new Text("20170602_1"))); + expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_9"), null)); + expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0c"), new Text("20170603_1"))); + expectedExtents.add(new LoadPlan.TableSplits(null, new Text("20170601_0"))); + expectedExtents.add(new LoadPlan.TableSplits(new Text("20170602_9c"), new Text("20170603_0"))); + expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0a"), new Text("20170603_0b"))); + expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0b"), new Text("20170603_0c"))); List tableSplits = getSplits(); - Set extents = rfileRows.stream().map(row -> findKeyExtent(row, tableSplits)).collect(Collectors.toCollection(HashSet::new)); + Set extents = rfileRows.stream().map(row -> findContainingSplits(row, tableSplits)) + .collect(Collectors.toCollection(HashSet::new)); assertEquals(expectedExtents, extents); } @@ -479,37 +478,28 @@ protected Map getShardLocations(String tableName) throws IOExceptio } @Override - protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf) { + protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf, String table) { filenames.add(filename); - return new SizeTrackingWriter(new FileSKVWriter() { + return new SizeTrackingWriter(null) { + public void startNewLocalityGroup(String name, Set columnFamilies) throws IOException { - @Override - public boolean supportsLocalityGroups() { - return false; } - @Override - public void startNewLocalityGroup(String name, Set columnFamilies) throws IOException {} + public void startDefaultLocalityGroup() throws IOException { - @Override - public void startDefaultLocalityGroup() throws IOException {} + } - @Override - public DataOutputStream createMetaStore(String name) throws IOException { - return null; + public void append(Key key, Value value) throws IOException { + entries++; + size += key.getLength() + (value == null ? 0 : value.getSize()); } - @Override public void close() throws IOException {} - @Override - public long getLength() throws IOException { - return 0; + public LoadPlan getLoadPlan(String filename) { + return LoadPlan.builder().build(); } - - @Override - public void append(Key key, Value value) throws IOException {} - }, false); + }; } }; diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java index d9672d6b28c..db3c705d616 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java @@ -532,18 +532,18 @@ public void testGetSplitsNoArgument() throws IOException { Assert.assertNotNull("TableSplitsCache constructor failed to construct an instance.", uut); - Map> resultsSet = uut.getSplits(); + Map> resultsSet = uut.getSplits(); Assert.assertNotNull("TableSplitsCache#getSplits() failed created a map of tables and their splits", resultsSet); Assert.assertFalse("TableSplitsCache#getSplits() incorrectly populated map of tables and their splits", resultsSet.isEmpty()); Assert.assertEquals("TableSplitsCache#getSplits() incorrectly populated map of tables and their splits", 3, resultsSet.size()); - List listings = new ArrayList(resultsSet.get("shard")); + List listings = new ArrayList(resultsSet.get("shard").get()); Assert.assertNotNull("TableSplitsCache#getSplits() failed to a list of splits", listings); Assert.assertFalse("TableSplitsCache#getSplits() incorrectly populated the list of splits", listings.isEmpty()); Assert.assertEquals("TableSplitsCache#getSplits() incorrectly populated the list of splits", 5, listings.size()); - listings = new ArrayList(resultsSet.get("shard1")); + listings = new ArrayList(resultsSet.get("shard1").get()); Assert.assertNotNull("TableSplitsCache#getSplits() failed to a list of splits", listings); Assert.assertFalse("TableSplitsCache#getSplits() incorrectly populated the list of splits", listings.isEmpty()); Assert.assertEquals("TableSplitsCache#getSplits() incorrectly populated the list of splits", 1, listings.size()); @@ -566,7 +566,7 @@ public void testGetSplitsWithArgumentThatMatches() throws IOException { Assert.assertNotNull("TableSplitsCache constructor failed to construct an instance.", uut); - List resultsSet = uut.getSplits().get("shard"); + List resultsSet = uut.getSplits().get("shard").get(); Assert.assertNotNull("TableSplitsCache#getSplits() failed to a list of splits", resultsSet); Assert.assertFalse("TableSplitsCache#getSplits() incorrectly populated the list of splits", resultsSet.isEmpty()); @@ -589,7 +589,7 @@ public void testGetSplitsWithArgumentThatDoesNotMatch() throws IOException { Assert.assertNotNull("TableSplitsCache constructor failed to construct an instance.", uut); - List resultsSet = uut.getSplits("bad-table"); + List resultsSet = uut.getSplits("bad-table").get(); Assert.assertNotNull("TableSplitsCache#getSplits() failed to a list of splits", resultsSet); Assert.assertTrue("TableSplitsCache#getSplits() incorrectly populated the list of splits", resultsSet.isEmpty());