Skip to content

Commit

Permalink
asdf
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-ratcliffe committed Sep 20, 2024
1 parent 7c11abd commit a2430be
Showing 1 changed file with 105 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.RandomAccess;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
Expand Down Expand Up @@ -306,7 +305,7 @@ protected void closeAndUpdateWriter(String key) throws IOException, AccumuloExce
// don't bother if this writer has not been used yet
if (file != null) {
if (writer.isLoadPlanning()) {
plan(writer, table, file);
computeLoadPlan(writer, table, file);
}
writer.close();
// pull the index off the filename
Expand All @@ -317,7 +316,7 @@ protected void closeAndUpdateWriter(String key) throws IOException, AccumuloExce
}

/**
* Init table's load plan and compute the plan asynchronously for the rfile
* Init table's LoadPlan list and compute the plan for the given rfile asynchronously
*
* @param writer
* RFile writer
Expand All @@ -326,17 +325,67 @@ protected void closeAndUpdateWriter(String key) throws IOException, AccumuloExce
* @param rfile
* rfile path
*/
protected void plan(SizeTrackingWriter writer, String tableName, Path rfile) {
protected void computeLoadPlan(SizeTrackingWriter writer, String tableName, Path rfile) {
if (!loadPlans.containsKey(tableName)) {
loadPlans.put(tableName, new LinkedList<>());
}
// @formatter:off
loadPlanFutures.add(CompletableFuture
.supplyAsync(() -> createLoadPlan(rfile, writer.rows, tableName))
.thenAccept(plan -> loadPlans.get(tableName).add(plan)));
.supplyAsync(() -> compute(rfile, writer.rows, tableName, conf))
.thenAccept(plan -> {
if (plan != null) {
loadPlans.get(tableName).add(plan);
}
}));
// @formatter:on
}

protected void commitLoadPlans(TaskAttemptContext context) {
loadPlanFutures.stream().forEach(f -> {
try {
f.get();
} catch (Exception e) {
log.error("Load planning failed", e);
throw new RuntimeException(e);
}
});
writeLoadPlans(context);
}

private void writeLoadPlans(TaskAttemptContext context) {
log.debug("Writing bulk load plans to disk for all tables");
// Consolidate all plans for a table into a single file
List<CompletableFuture<Void>> futures = new LinkedList<>();
for (Map.Entry<String,List<LoadPlan>> entry : loadPlans.entrySet()) {
var builder = LoadPlan.builder();
entry.getValue().stream().forEach(plan -> builder.addPlan(plan));
var table = entry.getKey();
var path = new Path(String.format("%s/%s", workDir, table), getUniqueFile(context, "loadplan", ".json"));
var loadPlan = builder.build();
// TODO: Use Gson streaming api instead (JsonWriter) to minimize impact on heap
//@formatter:off
futures.add(CompletableFuture.runAsync(() -> {
try (FSDataOutputStream out = fs.create(path)) {
log.debug("Begin writing load plan for " + path);
out.write(gson.toJson(loadPlan).getBytes(StandardCharsets.UTF_8));
log.debug("Completed writing load plan for " + path);
} catch (IOException ioe) {
log.error("Failed to write plan for " + path, ioe);
throw new RuntimeException(ioe);
}
}));
//@formatter:on
}
futures.stream().forEach(f -> {
try {
f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
log.debug("Finished writing bulk load plans to disk");
}

/**
* Creates a {@link LoadPlan} for the given RFile by mapping its row values to the relevant KeyExtents from the given table.
*
Expand All @@ -346,55 +395,75 @@ protected void plan(SizeTrackingWriter writer, String tableName, Path rfile) {
* Set of rows contained in the RFile
* @param tableName
* Table whose splits are to be examined to create the mapping
* @return LoadPlan for the RFile.
* @param conf
* The configuration required to retrieve table splits
* @return LoadPlan for the RFile
*/
protected LoadPlan createLoadPlan(Path rfile, SortedSet<Text> rfileRows, String tableName) {
var builder = LoadPlan.builder();
static LoadPlan compute(Path rfile, SortedSet<Text> rfileRows, String tableName, Configuration conf) {
if (rfileRows != null && !rfileRows.isEmpty()) {
List<Text> tableSplits;
try {
tableSplits = SplitsFile.getSplits(conf, tableName);
var splits = SplitsFile.getSplits(conf, tableName);
return compute(rfile, rfileRows, splits);
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve splits!", e);
}
// @formatter:off
if (tableSplits == null || tableSplits.isEmpty()) {
// RFile examination and mapping are handled later by bulk import client
log.debug("Calculating FILE load plan for " + rfile);
var ke = new KeyExtent(rfileRows.first(), rfileRows.last());
builder.addPlan(LoadPlan.builder().loadFileTo(
rfile.getName(), RangeType.FILE, ke.prevEndRow, ke.endRow).build());
} else {
// Calculate extent mapping, so that import client can skip examination/mapping at load time
log.debug("Calculating TABLE load plan for " + rfile);
rfileRows.stream().map(row -> findKeyExtent(row, tableSplits))
.collect(Collectors.toCollection(HashSet::new))
.forEach(ke -> builder.addPlan(LoadPlan.builder().loadFileTo(
rfile.getName(), RangeType.TABLE, ke.prevEndRow, ke.endRow).build())
);
log.debug("Table load plan completed for file: " + rfile);
}
// @formatter:on
}
return null;
}

/**
* Creates a {@link LoadPlan} for the given RFile by mapping its row values to the relevant KeyExtents from the given table.
*
* @param rfile
* RFile path
* @param rfileRows
* Set of rows contained in the RFile
* @param tableSplits
* Splits for the table being targeted
* @return LoadPlan for the RFile.
*/
static LoadPlan compute(Path rfile, SortedSet<Text> rfileRows, List<Text> tableSplits) {
// @formatter:off
var builder = LoadPlan.builder();
if (tableSplits == null || tableSplits.isEmpty()) {
// RFile examination and mapping handled later by bulk import client
log.debug("Calculating FILE load plan for " + rfile);
var ke = new KeyExtent(rfileRows.first(), rfileRows.last());
builder.addPlan(LoadPlan.builder().loadFileTo(
rfile.getName(), RangeType.FILE, ke.prevEndRow, ke.endRow).build());
} else {
// Compute extent mapping so that we can skip examination at load time
log.debug("Calculating TABLE load plan for " + rfile);
rfileRows.stream()
.map(row -> findKeyExtent(row, tableSplits))
.collect(Collectors.toCollection(HashSet::new))
.forEach(ke -> builder.addPlan(LoadPlan.builder().loadFileTo(
rfile.getName(), RangeType.TABLE, ke.prevEndRow, ke.endRow).build())
);
log.debug("Table load plan completed for file: " + rfile);
}
// @formatter:on
return builder.build();
}

/**
* Finds the KeyExtent where the specified row should reside
*
* @param lookupRow
* row value to be mapped
* Row value to be mapped
* @param tableSplits
* splits for the table in question
* Splits for the table in question
* @return KeyExtent mapping for the given row
*/
static KeyExtent findKeyExtent(Text lookupRow, List<Text> tableSplits) {
var ke = new KeyExtent();
var ceilingIdx = findCeiling(lookupRow, tableSplits);
if (ceilingIdx == -1) {
// Last tablet (endRow remains null)
ke.prevEndRow = tableSplits.get(tableSplits.size() - 1);
} else {
ke.endRow = tableSplits.get(ceilingIdx);
// If ceiling == 0, then first tablet (prevEndRow remains null)
if (ceilingIdx > 0) {
ke.prevEndRow = tableSplits.get(ceilingIdx - 1);
}
Expand All @@ -406,9 +475,9 @@ static KeyExtent findKeyExtent(Text lookupRow, List<Text> tableSplits) {
* Performs binary search on tableSplits to find the index of the first (least) split &ge; lookupRow
*
* @param lookupRow
* row for which we want to find the ceiling
* Row for which we want to find the ceiling
* @param tableSplits
* sorted table tableSplits list whose implementation is assumed to provide fast random access (i.e., {@link java.util.RandomAccess})
* Sorted table tableSplits list whose implementation is assumed to provide fast random access (i.e., {@link java.util.RandomAccess})
* @return index of the first split &ge; lookupRow, or -1 if (lookupRow &gt; tableSplits.get(tableSplits.size()-1)
*/
static int findCeiling(Text lookupRow, List<Text> tableSplits) {
Expand All @@ -427,52 +496,6 @@ static int findCeiling(Text lookupRow, List<Text> tableSplits) {
return ceiling;
}

private void commitLoadPlans(TaskAttemptContext context) {
loadPlanFutures.stream().forEach(f -> {
try {
f.get();
} catch (Exception e) {
log.error("Load planning failed", e);
throw new RuntimeException(e);
}
});
writeLoadPlans(context);
}

private void writeLoadPlans(TaskAttemptContext context) {
log.debug("Writing bulk load plans to disk for all tables");
// Consolidate all plans for a table into a single file
List<CompletableFuture<Void>> futures = new LinkedList<>();
for (Map.Entry<String,List<LoadPlan>> entry : loadPlans.entrySet()) {
var builder = LoadPlan.builder();
var table = entry.getKey();
var path = new Path(String.format("%s/%s", workDir, table), getUniqueFile(context, "loadplan", ".json"));
entry.getValue().stream().forEach(plan -> builder.addPlan(plan));
var loadPlan = builder.build();
// TODO: Use Gson streaming api instead (JsonWriter) to minimize impact on heap
//@formatter:off
futures.add(CompletableFuture.runAsync(() -> {
try (FSDataOutputStream out = fs.create(path)) {
log.debug("Begin writing load plan for " + path);
out.write(gson.toJson(loadPlan).getBytes(StandardCharsets.UTF_8));
log.debug("Completed writing load plan for " + path);
} catch (IOException ioe) {
log.error("Failed to write plan for " + path, ioe);
throw new RuntimeException(ioe);
}
}));
//@formatter:on
}
futures.stream().forEach(f -> {
try {
f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
log.debug("Finished writing bulk load plans to disk");
}

public static class SizeTrackingWriter implements FileSKVWriter {
private FileSKVWriter delegate;
private boolean loadPlanning;
Expand Down Expand Up @@ -775,7 +798,9 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc
for (Map.Entry<String,SizeTrackingWriter> entry : writers.entrySet()) {
var writer = entry.getValue();
if (writer.isLoadPlanning()) {
plan(writer, writerTableNames.get(entry.getKey()), usedWriterPaths.get(entry.getKey()));
var tableName = writerTableNames.get(entry.getKey());
var rfilePath = usedWriterPaths.get(entry.getKey());
computeLoadPlan(writer, tableName, rfilePath);
}
writer.close();
}
Expand Down

0 comments on commit a2430be

Please sign in to comment.