Skip to content

Commit

Permalink
WIP adapt DW PR#2568 to use accumulo PR#4898
Browse files Browse the repository at this point in the history
These draft changes build on NationalSecurityAgency#2568 with the following differences.

 * Compute bulkv2 load plans using new unreleased APIs in accumulo PR
   4898
 * The table splits are loaded at the beginning of writing to rfiles
   instead of at the end.  Not sure about the overall implications on
   on memory use in reducers of this change.  The load plan could
   be computed after the rfile is closed using a new API in 4898 if
   defering the loading of tablet splits is desired.
 * Switches to using accumulo public APIs for writing rfiles instaead of
   internal accumulo methods. Well public once they are actually
   released.
 * The algorithm to compute the load plan does less work per key/value.
   Should be rougly constant time vs log(N).
 * Adds a simple SortedList class.  This reason this was added is that
   this code does binary searches on list, however it was not certain
   those list were actually sorted.  If the list was not sorted it would
   not cause exceptions in binary search but could lead to incorrect load
   plans and lost data. This new SortedList class ensures list are
   sorted and allows this assurance to travel around in the code.  Maybe
   this change should be its own PR.
  • Loading branch information
keith-turner committed Sep 30, 2024
1 parent a2430be commit 1f9ec76
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 284 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<surefire.forkCount>1C</surefire.forkCount>
<version.accumulo>2.1.2</version.accumulo>
<version.accumulo>2.1.4-97e4684860</version.accumulo>
<version.arquillian>1.4.1.Final</version.arquillian>
<version.arquillian-weld-ee-embedded>1.0.0.Final</version.arquillian-weld-ee-embedded>
<version.assertj>3.20.2</version.assertj>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
*/
public final class BulkIngestMapFileLoader implements Runnable {
private static final Logger log = Logger.getLogger(BulkIngestMapFileLoader.class);
private static final Gson gson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()).create();
private static int SLEEP_TIME = 30000;
private static int FAILURE_SLEEP_TIME = 10 * 60 * 1000; // 10 minutes
private static int MAX_DIRECTORIES = 1;
Expand Down Expand Up @@ -1014,13 +1013,13 @@ private LoadPlan getLoadPlan() throws IOException {
in.readFully(0, buffer);
String s = new String(buffer, StandardCharsets.UTF_8);
// TODO: Use Gson streaming api instead to minimize impact on heap and cpu
builder.addPlan(gson.fromJson(s, LoadPlan.class));
builder.addPlan(LoadPlan.fromJson(s));
}
}
LoadPlan lp = builder.build();
log.debug("Completed deserializing load plan for " + tableDir);
if (log.isTraceEnabled()) {
log.trace("Consolidated LoadPlan for " + tableDir + ": " + gson.toJson(lp));
log.trace("Consolidated LoadPlan for " + tableDir + ": " + lp.toJson());
}
return lp;
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package datawave.ingest.mapreduce.job;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.apache.log4j.Logger;

/**
* Wraps a list that is immutable and verified as sorted.
*/
public class SortedList<T> {

private static final Logger log = Logger.getLogger(SortedList.class);

private final List<T> list;

private SortedList(List<T> list) {
this.list = list;
}

public List<T> get() {
return list;
}

private static final SortedList<?> EMPTY = new SortedList<>(List.of());

@SuppressWarnings("unchecked")
public static <T2> SortedList<T2> empty() {
return (SortedList<T2>) EMPTY;
}

/**
* For a list that is expected to be sorted this will verify it is sorted and if so return an immutable copy of it. If this list is not sorted it will log a
* warning, copy it, sort the copy, and return an immutable version of the copy.
*/
public static <T2> SortedList<T2> fromSorted(List<T2> list) {
if (list.isEmpty()) {
return empty();
}

var copy = List.copyOf(list);

// verify after copying because nothing can change at this point
boolean isSorted = true;
for (int i = 1; i < copy.size(); i++) {
@SuppressWarnings("unchecked")
var prev = (Comparable<? super T2>) copy.get(i - 1);
if (prev.compareTo(copy.get(i)) > 0) {
isSorted = false;
}
}

if (isSorted) {
return new SortedList<>(copy);
} else {
log.warn("Input list of size " + copy.size() + " was expected to be sorted but was not", new IllegalArgumentException());
return fromUnsorted(copy);
}
}

/**
* Copies a list and sorts the copy returning an immutable version of the copy.
*/
public static <T2> SortedList<T2> fromUnsorted(List<T2> list) {
if (list.isEmpty()) {
return empty();
}

var copy = new ArrayList<>(list);
@SuppressWarnings("unchecked")
var compartor = (Comparator<? super T2>) Comparator.naturalOrder();
list.sort(compartor);
return new SortedList<>(Collections.unmodifiableList(copy));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private static boolean shardsAreBalanced(Map<Text,String> locations, String date
return dateIsBalanced;
}

public static Map<String,List<Text>> getSplits(Configuration conf) throws IOException {
public static Map<String,SortedList<Text>> getSplits(Configuration conf) throws IOException {
return TableSplitsCache.getCurrentCache(conf).getSplits();

}
Expand All @@ -195,7 +195,7 @@ public static Map<Text,String> getSplitsAndLocations(Configuration conf, String
return TableSplitsCache.getCurrentCache(conf).getSplitsAndLocationByTable(tableName);
}

public static List<Text> getSplits(Configuration conf, String tableName) throws IOException {
public static SortedList<Text> getSplits(Configuration conf, String tableName) throws IOException {
return TableSplitsCache.getCurrentCache(conf).getSplits(tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil {

private Path splitsPath = null;
private Map<String,Map<Text,String>> splitLocations = new HashMap<>();
private Map<String,List<Text>> splits = new HashMap<>();
private Map<String,SortedList<Text>> splits = new HashMap<>();

private PartitionerCache partitionerCache;

Expand Down Expand Up @@ -116,6 +116,7 @@ public static void clear() {
* maximum number of splits to return
* @return split points grouped into fewer evenly grouped splits so as not to exceed maxSplits
*/
// TODO seems like this code is only used in test
public static List<Text> trimSplits(List<Text> tableSplits, int maxSplits) {
if (tableSplits.size() <= maxSplits) {
return tableSplits;
Expand Down Expand Up @@ -363,6 +364,7 @@ protected void readCache(BufferedReader in) throws IOException {
String line;
String tableName = null;
Map tmpSplitLocations = new ShardLocationTrieMap();
Map<String,List<Text>> tableSplits = new HashMap<>();
List<Text> tmpSplits = null;

while ((line = in.readLine()) != null) {
Expand All @@ -374,7 +376,7 @@ protected void readCache(BufferedReader in) throws IOException {
tableName = parts[0];
tmpSplitLocations = new ShardLocationTrieMap();
tmpSplits = new ArrayList<>();
this.splits.put(tableName, Collections.unmodifiableList(tmpSplits));
tableSplits.put(tableName, tmpSplits);
}
if (parts.length >= 2) {
Text split = new Text(Base64.decodeBase64(parts[1]));
Expand All @@ -387,6 +389,11 @@ protected void readCache(BufferedReader in) throws IOException {
if (!tmpSplitLocations.isEmpty()) {
this.splitLocations.put(tableName, tmpSplitLocations);
}

tableSplits.forEach((table, splits) -> {
this.splits.put(table, SortedList.fromSorted(splits));
});

in.close();
}

Expand All @@ -407,12 +414,12 @@ private String dedup(Map<String,String> dedupMap, String value) {
* @throws IOException
* for issues with read or write
*/
public List<Text> getSplits(String table) throws IOException {
public SortedList<Text> getSplits(String table) throws IOException {
if (this.splits.isEmpty()) {
read();
}
List<Text> splitList = this.splits.get(table);
return (splitList == null ? Collections.emptyList() : splitList);
SortedList<Text> splitList = this.splits.get(table);
return (splitList == null ? SortedList.empty() : splitList);
}

/**
Expand All @@ -425,16 +432,17 @@ public List<Text> getSplits(String table) throws IOException {
* @throws IOException
* for issues with read or write
*/
// TODO seems like this code is only used in test, can it be removed? Looked into making it return SortedList
public List<Text> getSplits(String table, int maxSplits) throws IOException {
return trimSplits(getSplits(table), maxSplits);
return trimSplits(getSplits(table).get(), maxSplits);
}

/**
* @return map of table name to list of splits for the table
* @throws IOException
* for issues with read or write
*/
public Map<String,List<Text>> getSplits() throws IOException {
public Map<String,SortedList<Text>> getSplits() throws IOException {
if (this.splits.isEmpty())
read();
return Collections.unmodifiableMap(splits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import datawave.ingest.mapreduce.handler.shard.ShardIdFactory;
import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler;
import datawave.ingest.mapreduce.job.BulkIngestKey;
import datawave.ingest.mapreduce.job.SortedList;
import datawave.ingest.mapreduce.job.SplitsFile;
import datawave.util.time.DateHelper;

Expand Down Expand Up @@ -134,7 +135,7 @@ private HashMap<Text,Integer> getPartitionsByShardId(String tableName) throws IO
if (log.isDebugEnabled())
log.debug("Loading splits data for " + tableName);

List<Text> sortedSplits = SplitsFile.getSplits(conf, tableName);
SortedList<Text> sortedSplits = SplitsFile.getSplits(conf, tableName);
Map<Text,String> shardIdToLocation = SplitsFile.getSplitsAndLocations(conf, tableName);

if (log.isDebugEnabled())
Expand All @@ -158,7 +159,7 @@ private HashMap<Text,Integer> getPartitionsByShardId(String tableName) throws IO
* the map of shard ids and their location
* @return shardId to
*/
private HashMap<Text,Integer> assignPartitionsForEachShard(List<Text> sortedShardIds, Map<Text,String> shardIdToLocations) {
private HashMap<Text,Integer> assignPartitionsForEachShard(SortedList<Text> sortedShardIds, Map<Text,String> shardIdToLocations) {
int totalNumUniqueTServers = calculateNumberOfUniqueTservers(shardIdToLocations);

HashMap<String,Integer> partitionsByTServer = getTServerAssignments(totalNumUniqueTServers, sortedShardIds, shardIdToLocations);
Expand All @@ -178,11 +179,11 @@ private int calculateNumberOfUniqueTservers(Map<Text,String> shardIdToLocations)
return totalNumUniqueTServers;
}

private HashMap<String,Integer> getTServerAssignments(int totalNumTServers, List<Text> sortedShardIds, Map<Text,String> shardIdsToTservers) {
private HashMap<String,Integer> getTServerAssignments(int totalNumTServers, SortedList<Text> sortedShardIds, Map<Text,String> shardIdsToTservers) {
HashMap<String,Integer> partitionsByTServer = new HashMap<>(totalNumTServers);
int nextAvailableSlot = 0;
boolean alreadySkippedFutureShards = false;
for (Text shard : sortedShardIds) {
for (Text shard : sortedShardIds.get()) {
if (alreadySkippedFutureShards || !isFutureShard(shard)) { // short circuiting for performance
alreadySkippedFutureShards = true;
String location = shardIdsToTservers.get(shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.log4j.Logger;

import datawave.ingest.mapreduce.job.BulkIngestKey;
import datawave.ingest.mapreduce.job.SortedList;
import datawave.ingest.mapreduce.job.SplitsFile;

/**
Expand Down Expand Up @@ -82,7 +83,7 @@ public int getPartition(BulkIngestKey key, Value value, int numPartitions) {

String tableName = key.getTableName().toString();

List<Text> cutPointArray = null;
SortedList<Text> cutPointArray = null;
try {
cutPointArray = SplitsFile.getSplits(conf, tableName);
} catch (IOException e) {
Expand All @@ -92,8 +93,8 @@ public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
return (tableName.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
key.getKey().getRow(holder);
int index = Collections.binarySearch(cutPointArray, holder);
index = calculateIndex(index, numPartitions, tableName, cutPointArray.size());
int index = Collections.binarySearch(cutPointArray.get(), holder);
index = calculateIndex(index, numPartitions, tableName, cutPointArray.get().size());

index = partitionLimiter.limit(numPartitions, index);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package datawave.ingest.mapreduce.job;

import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.BULK_IMPORT_MODE_CONFIG;
import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findKeyExtent;
import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findContainingSplits;
import static org.junit.Assert.assertEquals;

import java.io.DataOutputStream;
Expand All @@ -28,7 +28,6 @@
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordWriter;
Expand All @@ -49,7 +48,6 @@

import datawave.ingest.data.config.ingest.AccumuloHelper;
import datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.ImportMode;
import datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.KeyExtent;
import datawave.util.TableName;

public class MultiRFileOutputFormatterTest {
Expand Down Expand Up @@ -235,19 +233,20 @@ public void testPlanning() {
rfileRows.add(new Text("20170601_9"));
rfileRows.add(new Text("20200601_9"));

Set<KeyExtent> expectedExtents = new HashSet<>();
expectedExtents.add(new KeyExtent(new Text("20170601_0"), new Text("20170601_1")));
expectedExtents.add(new KeyExtent(new Text("20170601_8"), new Text("20170601_9")));
expectedExtents.add(new KeyExtent(new Text("20170602_0"), new Text("20170602_1")));
expectedExtents.add(new KeyExtent(new Text("20170603_9"), null));
expectedExtents.add(new KeyExtent(new Text("20170603_0c"), new Text("20170603_1")));
expectedExtents.add(new KeyExtent(null, new Text("20170601_0")));
expectedExtents.add(new KeyExtent(new Text("20170602_9c"), new Text("20170603_0")));
expectedExtents.add(new KeyExtent(new Text("20170603_0a"), new Text("20170603_0b")));
expectedExtents.add(new KeyExtent(new Text("20170603_0b"), new Text("20170603_0c")));
Set<LoadPlan.TableSplits> expectedExtents = new HashSet<>();
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170601_0"), new Text("20170601_1")));
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170601_8"), new Text("20170601_9")));
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170602_0"), new Text("20170602_1")));
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_9"), null));
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0c"), new Text("20170603_1")));
expectedExtents.add(new LoadPlan.TableSplits(null, new Text("20170601_0")));
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170602_9c"), new Text("20170603_0")));
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0a"), new Text("20170603_0b")));
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0b"), new Text("20170603_0c")));

List<Text> tableSplits = getSplits();
Set<KeyExtent> extents = rfileRows.stream().map(row -> findKeyExtent(row, tableSplits)).collect(Collectors.toCollection(HashSet::new));
Set<LoadPlan.TableSplits> extents = rfileRows.stream().map(row -> findContainingSplits(row, tableSplits))
.collect(Collectors.toCollection(HashSet::new));

assertEquals(expectedExtents, extents);
}
Expand Down Expand Up @@ -479,37 +478,28 @@ protected Map<Text,String> getShardLocations(String tableName) throws IOExceptio
}

@Override
protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf) {
protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf, String table) {
filenames.add(filename);
return new SizeTrackingWriter(new FileSKVWriter() {
return new SizeTrackingWriter(null) {
public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {

@Override
public boolean supportsLocalityGroups() {
return false;
}

@Override
public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {}
public void startDefaultLocalityGroup() throws IOException {

@Override
public void startDefaultLocalityGroup() throws IOException {}
}

@Override
public DataOutputStream createMetaStore(String name) throws IOException {
return null;
public void append(Key key, Value value) throws IOException {
entries++;
size += key.getLength() + (value == null ? 0 : value.getSize());
}

@Override
public void close() throws IOException {}

@Override
public long getLength() throws IOException {
return 0;
public LoadPlan getLoadPlan(String filename) {
return LoadPlan.builder().build();
}

@Override
public void append(Key key, Value value) throws IOException {}
}, false);
};
}
};

Expand Down
Loading

0 comments on commit 1f9ec76

Please sign in to comment.