diff --git a/contrib/datawave-quickstart/bin/services/datawave/bootstrap-ingest.sh b/contrib/datawave-quickstart/bin/services/datawave/bootstrap-ingest.sh
index bd689ec71d1..09af6bbab08 100644
--- a/contrib/datawave-quickstart/bin/services/datawave/bootstrap-ingest.sh
+++ b/contrib/datawave-quickstart/bin/services/datawave/bootstrap-ingest.sh
@@ -9,7 +9,7 @@ DW_DATAWAVE_INGEST_HOME="${DW_CLOUD_HOME}/${DW_DATAWAVE_INGEST_SYMLINK}"
# ingest reducers. Set to 1 for standalone instance, but typically set to the first prime number that is less than the
# number of available Accumulo tablet servers...
-DW_DATAWAVE_INGEST_NUM_SHARDS=${DW_DATAWAVE_INGEST_NUM_SHARDS:-1}
+DW_DATAWAVE_INGEST_NUM_SHARDS=${DW_DATAWAVE_INGEST_NUM_SHARDS:-10}
# Ingest job logs, etc
@@ -39,7 +39,7 @@ DW_DATAWAVE_INGEST_FLAGFILE_DIR="${DW_DATAWAVE_DATA_DIR}/flags"
# Comma-delimited list of configs for the FlagMaker process(es)
-DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS=${DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS:-"${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-live.xml"}
+DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS=${DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS:-"${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-live.xml,${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-bulk.xml"}
# Dir for ingest-related 'pid' files
@@ -72,7 +72,7 @@ DW_DATAWAVE_INGEST_LIVE_DATA_TYPES=${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES:-"wikipe
# Comma-delimited data type identifiers to be ingested via "bulk" ingest, ie via bulk import of RFiles into Accumulo tables
-DW_DATAWAVE_INGEST_BULK_DATA_TYPES=${DW_DATAWAVE_INGEST_BULK_DATA_TYPES:-"shardStats"}
+DW_DATAWAVE_INGEST_BULK_DATA_TYPES=${DW_DATAWAVE_INGEST_BULK_DATA_TYPES:-"shardStats,wikipedia,mycsv,myjson"}
DW_DATAWAVE_MAPRED_INGEST_OPTS=${DW_DATAWAVE_MAPRED_INGEST_OPTS:-"-useInlineCombiner -ingestMetricsDisabled"}
diff --git a/contrib/datawave-quickstart/bin/services/datawave/install-ingest.sh b/contrib/datawave-quickstart/bin/services/datawave/install-ingest.sh
index a2cd713d960..6990644fc7b 100755
--- a/contrib/datawave-quickstart/bin/services/datawave/install-ingest.sh
+++ b/contrib/datawave-quickstart/bin/services/datawave/install-ingest.sh
@@ -30,12 +30,14 @@ tar xf "${DW_DATAWAVE_SERVICE_DIR}/${DW_DATAWAVE_INGEST_DIST}" -C "${TARBALL_BAS
info "DataWave Ingest tarball extracted and symlinked"
+source "${THIS_DIR}/fix-hadoop-classpath.sh"
+
if ! hadoopIsRunning ; then
info "Starting Hadoop, so that we can initialize Accumulo"
hadoopStart
fi
-# Create any Hadoop directories related to Datawave Ingest
+# Create any Hadoop directories needed for live ingest input
if [[ -n "${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES}" ]] ; then
OLD_IFS="${IFS}"
@@ -44,10 +46,25 @@ if [[ -n "${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES}" ]] ; then
IFS="${OLD_IFS}"
for dir in "${HDFS_RAW_INPUT_DIRS[@]}" ; do
+ # Dirs created here should be configured in your live flag maker config (e.g., in config/flag-maker-live.xml)
hdfs dfs -mkdir -p "${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/${dir}" || fatal "Failed to create HDFS directory: ${dir}"
done
fi
+# Create any Hadoop directories needed for bulk ingest input
+if [[ -n "${DW_DATAWAVE_INGEST_BULK_DATA_TYPES}" ]] ; then
+
+ OLD_IFS="${IFS}"
+ IFS=","
+ HDFS_RAW_INPUT_DIRS=( ${DW_DATAWAVE_INGEST_BULK_DATA_TYPES} )
+ IFS="${OLD_IFS}"
+
+ for dir in "${HDFS_RAW_INPUT_DIRS[@]}" ; do
+ # Dirs created here should be configured in your bulk flag maker config (e.g., in config/flag-maker-bulk.xml)
+ hdfs dfs -mkdir -p "${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/${dir}-bulk" || fatal "Failed to create HDFS directory: ${dir}-bulk"
+ done
+fi
+
#----------------------------------------------------------
# Configure/update Accumulo classpath, set auths, etc
#----------------------------------------------------------
diff --git a/properties/dev.properties b/properties/dev.properties
index ea526d63026..55e84792edd 100644
--- a/properties/dev.properties
+++ b/properties/dev.properties
@@ -41,7 +41,7 @@ LIVE_CHILD_MAP_MAX_MEMORY_MB=1024
BULK_CHILD_REDUCE_MAX_MEMORY_MB=2048
LIVE_CHILD_REDUCE_MAX_MEMORY_MB=1024
-BULK_INGEST_DATA_TYPES=shardStats
+BULK_INGEST_DATA_TYPES=shardStats,wikipedia,mycsv,myjson
LIVE_INGEST_DATA_TYPES=wikipedia,mycsv,myjson
# Clear out these values if you do not want standard shard ingest.
diff --git a/warehouse/ingest-configuration/src/main/resources/config/ingest-config.xml b/warehouse/ingest-configuration/src/main/resources/config/ingest-config.xml
index b23a72e2670..c51abb4d599 100644
--- a/warehouse/ingest-configuration/src/main/resources/config/ingest-config.xml
+++ b/warehouse/ingest-configuration/src/main/resources/config/ingest-config.xml
@@ -91,4 +91,24 @@
partitioner.default.delegate
datawave.ingest.mapreduce.partition.MultiTableRRRangePartitioner
+
+
+ datawave.ingest.splits.cache.dir
+ ${WAREHOUSE_HDFS_NAME_NODE}/data/splitsCache
+
+
+
+ accumulo.config.cache.path
+ ${WAREHOUSE_HDFS_NAME_NODE}/data/accumuloConfigCache/accConfCache.txt
+
+
+
+ ingest.bulk.import.mode
+ V2_LOAD_PLANNING
+
+ Must be one of [V1, V2_LOCAL_MAPPING, V2_LOAD_PLANNING]
+ (See BulkIngestMapFileLoader.ImportMode)
+
+
+
diff --git a/warehouse/ingest-configuration/src/main/resources/config/log4j-bulkloader.xml b/warehouse/ingest-configuration/src/main/resources/config/log4j-bulkloader.xml
new file mode 100644
index 00000000000..2878273c05b
--- /dev/null
+++ b/warehouse/ingest-configuration/src/main/resources/config/log4j-bulkloader.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/warehouse/ingest-core/pom.xml b/warehouse/ingest-core/pom.xml
index d46f4a23f1b..081b5d7d87c 100644
--- a/warehouse/ingest-core/pom.xml
+++ b/warehouse/ingest-core/pom.xml
@@ -14,6 +14,11 @@
com.clearspring.analytics
stream
+
+ com.google.code.gson
+ gson
+ 2.9.0
+
com.sun.xml.bind
jaxb-impl
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/config/BaseHdfsFileCacheUtil.java b/warehouse/ingest-core/src/main/java/datawave/ingest/config/BaseHdfsFileCacheUtil.java
index 57f7d5f8991..8d96fc6c002 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/config/BaseHdfsFileCacheUtil.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/config/BaseHdfsFileCacheUtil.java
@@ -130,7 +130,7 @@ public Path createTempFile(FileSystem fs) throws IOException {
do {
Path parentDirectory = this.cacheFilePath.getParent();
String fileName = this.cacheFilePath.getName() + "." + count;
- log.info("Attempting to create " + fileName + "under " + parentDirectory);
+ log.info("Attempting to create " + fileName + " under " + parentDirectory);
tmpCacheFile = new Path(parentDirectory, fileName);
count++;
} while (!fs.createNewFile(tmpCacheFile));
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java
index cfd066ee75b..b95a7a6a177 100755
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java
@@ -6,14 +6,17 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -33,6 +36,7 @@
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
import org.apache.accumulo.core.master.thrift.TableInfo;
@@ -40,6 +44,7 @@
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -62,6 +67,15 @@
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
import datawave.ingest.data.TypeRegistry;
import datawave.ingest.mapreduce.StandaloneStatusReporter;
@@ -73,7 +87,8 @@
* various tablet servers.
*/
public final class BulkIngestMapFileLoader implements Runnable {
- private static Logger log = Logger.getLogger(BulkIngestMapFileLoader.class);
+ private static final Logger log = Logger.getLogger(BulkIngestMapFileLoader.class);
+ private static final Gson gson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()).create();
private static int SLEEP_TIME = 30000;
private static int FAILURE_SLEEP_TIME = 10 * 60 * 1000; // 10 minutes
private static int MAX_DIRECTORIES = 1;
@@ -83,6 +98,7 @@ public final class BulkIngestMapFileLoader implements Runnable {
private static int SHUTDOWN_PORT = 24111;
private static boolean FIFO = true;
private static boolean INGEST_METRICS = true;
+ private static ImportMode BULK_IMPORT_MODE = ImportMode.V1;
public static final String CLEANUP_FILE_MARKER = "job.cleanup";
public static final String COMPLETE_FILE_MARKER = "job.complete";
@@ -90,6 +106,7 @@ public final class BulkIngestMapFileLoader implements Runnable {
public static final String FAILED_FILE_MARKER = "job.failed";
public static final String ATTEMPT_FILE_MARKER = "job.load.attempt.failed.do.not.delete";
public static final String INPUT_FILES_MARKER = "job.paths";
+ public static final String BULK_IMPORT_MODE_CONFIG = "ingest.bulk.import.mode";
private Path workDir;
private String jobDirPattern;
@@ -106,6 +123,29 @@ public final class BulkIngestMapFileLoader implements Runnable {
private volatile boolean running;
private ExecutorService executor;
private JobObservable jobObservable;
+ private int sleepTime;
+ private int failSleepTime;
+ private boolean writeStats;
+ private ImportMode importMode;
+
+ public enum ImportMode {
+ /**
+ * Accumulo's 1.x bulk api will be used to import rfiles.
+ */
+ @Deprecated
+ V1,
+ /**
+ * Accumulo's 2.x bulk api will be used to import rfiles. All rfile-to-tablet mappings are computed locally within the {@link BulkIngestMapFileLoader}
+ * JVM upon import. This will incur an import latency cost that is proportional to the size/number of rfiles to be imported and the number of tablets to
+ * be targeted
+ */
+ V2_LOCAL_MAPPING,
+ /**
+ * Accumulo's 2.x bulk api will be used to import rfiles. All rfile-to-tablet mappings are determined from precomputed
+ * {@link org.apache.accumulo.core.data.LoadPlan} files created in {@link MultiRFileOutputFormatter}
+ */
+ V2_LOAD_PLANNING
+ }
public static void main(String[] args) throws AccumuloSecurityException, IOException, NoSuchMethodException {
@@ -316,6 +356,8 @@ public static void main(String[] args) throws AccumuloSecurityException, IOExcep
}
}
+ BULK_IMPORT_MODE = conf.getEnum(BULK_IMPORT_MODE_CONFIG, ImportMode.V1);
+
log.info("Set sleep time to " + SLEEP_TIME + "ms");
log.info("Will wait to bring map files online if there are more than " + MAJC_THRESHOLD + " running or queued major compactions.");
log.info("Will not bring map files online unless at least " + MAJC_WAIT_TIMEOUT + "ms have passed since last time.");
@@ -330,6 +372,7 @@ public static void main(String[] args) throws AccumuloSecurityException, IOExcep
log.info("Using " + jobtracker + " as the jobtracker");
log.info("Using " + SHUTDOWN_PORT + " as the shutdown port");
log.info("Using " + (FIFO ? "FIFO" : "LIFO") + " processing order");
+ log.info("Using " + BULK_IMPORT_MODE + " bulk import mode");
for (String[] s : properties) {
conf.set(s[0], s[1]);
@@ -357,7 +400,8 @@ public static void main(String[] args) throws AccumuloSecurityException, IOExcep
String passwordStr = PasswordConverter.parseArg(args[5]);
BulkIngestMapFileLoader processor = new BulkIngestMapFileLoader(workDir, jobDirPattern, instanceName, zooKeepers, user, new PasswordToken(passwordStr),
- seqFileHdfs, srcHdfs, destHdfs, jobtracker, tablePriorities, conf, SHUTDOWN_PORT, numHdfsThreads, jobObservers);
+ seqFileHdfs, srcHdfs, destHdfs, jobtracker, tablePriorities, conf, SHUTDOWN_PORT, numHdfsThreads, jobObservers, SLEEP_TIME,
+ FAILURE_SLEEP_TIME, INGEST_METRICS, BULK_IMPORT_MODE);
Thread t = new Thread(processor, "map-file-watcher");
t.start();
}
@@ -365,18 +409,18 @@ public static void main(String[] args) throws AccumuloSecurityException, IOExcep
public BulkIngestMapFileLoader(String workDir, String jobDirPattern, String instanceName, String zooKeepers, String user, PasswordToken passToken,
URI seqFileHdfs, URI srcHdfs, URI destHdfs, String jobtracker, Map tablePriorities, Configuration conf) {
this(workDir, jobDirPattern, instanceName, zooKeepers, user, passToken, seqFileHdfs, srcHdfs, destHdfs, jobtracker, tablePriorities, conf,
- SHUTDOWN_PORT, 1, Collections.emptyList());
+ SHUTDOWN_PORT, 1, Collections.emptyList(), SLEEP_TIME, FAILURE_SLEEP_TIME, INGEST_METRICS, BULK_IMPORT_MODE);
}
public BulkIngestMapFileLoader(String workDir, String jobDirPattern, String instanceName, String zooKeepers, String user, PasswordToken passToken,
URI seqFileHdfs, URI srcHdfs, URI destHdfs, String jobtracker, Map tablePriorities, Configuration conf, int shutdownPort) {
this(workDir, jobDirPattern, instanceName, zooKeepers, user, passToken, seqFileHdfs, srcHdfs, destHdfs, jobtracker, tablePriorities, conf, shutdownPort,
- 1, Collections.emptyList());
+ 1, Collections.emptyList(), SLEEP_TIME, FAILURE_SLEEP_TIME, INGEST_METRICS, BULK_IMPORT_MODE);
}
public BulkIngestMapFileLoader(String workDir, String jobDirPattern, String instanceName, String zooKeepers, String user, PasswordToken passToken,
URI seqFileHdfs, URI srcHdfs, URI destHdfs, String jobtracker, Map tablePriorities, Configuration conf, int shutdownPort,
- int numHdfsThreads, List jobObservers) {
+ int numHdfsThreads, List jobObservers, int sleepTime, int failSleepTime, boolean writeStats, ImportMode importMode) {
this.conf = conf;
this.tablePriorities = tablePriorities;
this.workDir = new Path(workDir);
@@ -390,6 +434,10 @@ public BulkIngestMapFileLoader(String workDir, String jobDirPattern, String inst
this.destHdfs = destHdfs;
this.jobtracker = jobtracker;
this.running = true;
+ this.sleepTime = sleepTime;
+ this.failSleepTime = failSleepTime;
+ this.writeStats = writeStats;
+ this.importMode = importMode;
this.executor = Executors.newFixedThreadPool(numHdfsThreads > 0 ? numHdfsThreads : 1);
try {
this.jobObservable = new JobObservable(seqFileHdfs != null ? getFileSystem(seqFileHdfs) : null);
@@ -468,7 +516,7 @@ public void run() {
if (takeOwnershipJobDirectory(srcJobDirectory)) {
processedDirectories.add(srcJobDirectory);
Path mapFilesDir = new Path(srcJobDirectory, "mapFiles");
- if (INGEST_METRICS) {
+ if (writeStats) {
reporter.getCounter("MapFileLoader.StartTimes", srcJobDirectory.getName()).increment(System.currentTimeMillis());
}
Path dstJobDirectory = srcJobDirectory;
@@ -508,7 +556,7 @@ public void run() {
} else {
log.warn("Failed to mark " + dstJobDirectory + " as failed. Sleeping in case this was a transient failure.");
try {
- Thread.sleep(FAILURE_SLEEP_TIME);
+ Thread.sleep(failSleepTime);
} catch (InterruptedException ie) {
log.warn("Interrupted while sleeping.", ie);
}
@@ -907,19 +955,41 @@ public void run() {
// Ensure all of the files put just under tableDir....
collapseDirectory();
- // create the failures directory
- String failuresDir = mapFilesDir + "/failures/" + tableName;
- Path failuresPath = new Path(failuresDir);
- FileSystem fileSystem = FileSystem.get(srcHdfs, new Configuration());
- if (fileSystem.exists(failuresPath)) {
- log.fatal("Cannot bring map files online because a failures directory already exists: " + failuresDir);
- throw new IOException("Cannot bring map files online because a failures directory already exists: " + failuresDir);
- }
- fileSystem.mkdirs(failuresPath);
-
// import the directory
log.info("Bringing Map Files online for " + tableName);
- accumuloClient.tableOperations().importDirectory(tableName, tableDir.toString(), failuresDir, false);
+
+ // @formatter:off
+ switch (importMode) {
+ case V1:
+ // create the failures directory
+ String failuresDir = mapFilesDir + "/failures/" + tableName;
+ Path failuresPath = new Path(failuresDir);
+ FileSystem fileSystem = FileSystem.get(srcHdfs, new Configuration());
+ if (fileSystem.exists(failuresPath)) {
+ log.fatal("Cannot bring map files online because a failures directory already exists: " + failuresDir);
+ throw new IOException("Cannot bring map files online because a failures directory already exists: " + failuresDir);
+ }
+ fileSystem.mkdirs(failuresPath);
+ accumuloClient.tableOperations()
+ .importDirectory(tableName, tableDir.toString(), failuresDir, false);
+ break;
+ case V2_LOCAL_MAPPING:
+ accumuloClient.tableOperations().importDirectory(tableDir.toString())
+ .to(tableName)
+ .ignoreEmptyDir(true)
+ .tableTime(false).load();
+ break;
+ case V2_LOAD_PLANNING:
+ accumuloClient.tableOperations().importDirectory(tableDir.toString())
+ .to(tableName)
+ .plan(getLoadPlan())
+ .ignoreEmptyDir(true)
+ .tableTime(false).load();
+ break;
+ default:
+ throw new RuntimeException("Unsupported import mode " + importMode);
+ }
+ // @formatter:on
log.info("Completed bringing map files online for " + tableName);
validateComplete();
} catch (Exception e) {
@@ -933,6 +1003,28 @@ public void run() {
}
}
+ private LoadPlan getLoadPlan() throws IOException {
+ FileSystem fs = FileSystem.get(srcHdfs, new Configuration());
+ FileStatus[] loadPlans = fs.globStatus(new Path(tableDir, "loadplan*.json"));
+ var builder = LoadPlan.builder();
+ log.debug("Deserializing load plan for " + tableDir);
+ for (FileStatus lp : loadPlans) {
+ try (FSDataInputStream in = fs.open(lp.getPath())) {
+ byte[] buffer = new byte[(int) lp.getLen()];
+ in.readFully(0, buffer);
+ String s = new String(buffer, StandardCharsets.UTF_8);
+ // TODO: Use Gson streaming api instead to minimize impact on heap and cpu
+ builder.addPlan(gson.fromJson(s, LoadPlan.class));
+ }
+ }
+ LoadPlan lp = builder.build();
+ log.debug("Completed deserializing load plan for " + tableDir);
+ if (log.isTraceEnabled()) {
+ log.trace("Consolidated LoadPlan for " + tableDir + ": " + gson.toJson(lp));
+ }
+ return lp;
+ }
+
private void collapseDirectory() throws IOException {
collapseDirectory(tableDir);
}
@@ -997,7 +1089,8 @@ private String getNextName(String rfile) {
private void validateComplete() throws IOException {
FileSystem fileSystem = FileSystem.get(srcHdfs, new Configuration());
- if (fileSystem.listStatus(tableDir).length > 0) {
+ // Make sure all rfiles are processed, disregarding any loadplan*.json files
+ if (fileSystem.globStatus(new Path(tableDir, "*.rf")).length > 0) {
log.fatal("Failed to completely import " + tableDir);
throw new IOException("Failed to completely import " + tableDir);
}
@@ -1261,7 +1354,7 @@ public boolean markDirectoryForCleanup(Path jobDirectory, URI destFs) {
}
private void writeStats(Path[] jobDirectories) throws IOException {
- if (!INGEST_METRICS) {
+ if (!writeStats) {
log.info("ingest metrics disabled");
} else {
long now = System.currentTimeMillis();
@@ -1316,10 +1409,25 @@ private Path getCrcFile(Path path) {
private void sleep() {
try {
System.gc();
- Thread.sleep(SLEEP_TIME);
+ Thread.sleep(sleepTime);
} catch (InterruptedException e) {
log.warn("Interrupted while sleeping.", e);
}
}
+ public static class ByteArrayToBase64TypeAdapter implements JsonSerializer, JsonDeserializer {
+ Base64.Decoder decoder = Base64.getUrlDecoder();
+ Base64.Encoder encoder = Base64.getUrlEncoder();
+
+ @Override
+ public byte[] deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+ return decoder.decode(json.getAsString());
+ }
+
+ @Override
+ public JsonElement serialize(byte[] src, Type typeOfSrc, JsonSerializationContext context) {
+ return new JsonPrimitive(encoder.encodeToString(src));
+ }
+ }
+
}
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java
index 1495bf78b82..09d06b8a11c 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java
@@ -1,16 +1,27 @@
package datawave.ingest.mapreduce.job;
+import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.BULK_IMPORT_MODE_CONFIG;
import static org.apache.accumulo.core.conf.Property.TABLE_CRYPTO_PREFIX;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -20,6 +31,8 @@
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
+import org.apache.accumulo.core.data.LoadPlan.RangeType;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
@@ -30,6 +43,7 @@
import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -46,9 +60,12 @@
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import datawave.ingest.data.config.ingest.AccumuloHelper;
import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler;
+import datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.ImportMode;
import datawave.marking.MarkingFunctions;
import datawave.util.StringUtils;
@@ -56,7 +73,13 @@ public class MultiRFileOutputFormatter extends FileOutputFormat writers = null;
+ protected boolean loadPlanningEnabled = false;
+ protected Map> loadPlans = null;
+ protected List> loadPlanFutures = null;
protected Map unusedWriterPaths = null;
protected Map usedWriterPaths = null;
protected Map writerTableNames = null;
@@ -176,6 +199,10 @@ public static void addTableToLocalityGroupConfiguration(Configuration conf, Stri
conf.set(CONFIGURE_LOCALITY_GROUPS, Joiner.on(",").join(splits, tableName));
}
+ public static boolean loadPlanningEnabled(Configuration conf) {
+ return conf.getEnum(BULK_IMPORT_MODE_CONFIG, ImportMode.V1).equals(ImportMode.V2_LOAD_PLANNING);
+ }
+
/**
* Insert a count into the filename. The filename is expected to end with our extension.
*
@@ -250,9 +277,13 @@ protected void createAndRegisterWriter(String key, String table, Path filename,
protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf) throws IOException {
startWriteTime = System.currentTimeMillis();
- CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, tableConf.getAllCryptoProperties());
+ // @formatter:off
+ CryptoService cs = CryptoFactoryLoader.getServiceForClient(
+ CryptoEnvironment.Scope.TABLE, tableConf.getAllCryptoProperties());
return new SizeTrackingWriter(
- FileOperations.getInstance().newWriterBuilder().forFile(filename, fs, conf, cs).withTableConfiguration(tableConf).build());
+ FileOperations.getInstance().newWriterBuilder().forFile(filename, fs, conf, cs).withTableConfiguration(tableConf).build(),
+ this.loadPlanningEnabled);
+ // @formatter:on
}
/**
@@ -270,19 +301,186 @@ protected void closeAndUpdateWriter(String key) throws IOException, AccumuloExce
// don't bother if we have not created a writer for this key yet
if (writer != null) {
String table = writerTableNames.get(key);
- Path filename = usedWriterPaths.get(key);
+ Path file = usedWriterPaths.get(key);
// don't bother if this writer has not been used yet
- if (filename != null) {
+ if (file != null) {
+ if (writer.isLoadPlanning()) {
+ computeLoadPlan(writer, table, file);
+ }
writer.close();
// pull the index off the filename
- filename = removeFileCount(filename);
- createAndRegisterWriter(key, table, filename, tableConfigs.get(table));
+ file = removeFileCount(file);
+ createAndRegisterWriter(key, table, file, tableConfigs.get(table));
+ }
+ }
+ }
+
+ /**
+ * Init table's LoadPlan list and compute the plan for the given rfile asynchronously
+ *
+ * @param writer
+ * RFile writer
+ * @param tableName
+ * table name
+ * @param rfile
+ * rfile path
+ */
+ protected void computeLoadPlan(SizeTrackingWriter writer, String tableName, Path rfile) {
+ if (!loadPlans.containsKey(tableName)) {
+ loadPlans.put(tableName, new LinkedList<>());
+ }
+ // @formatter:off
+ loadPlanFutures.add(CompletableFuture
+ .supplyAsync(() -> compute(rfile, writer.rows, tableName, conf))
+ .thenAccept(plan -> {
+ if (plan != null) {
+ loadPlans.get(tableName).add(plan);
+ }
+ }));
+ // @formatter:on
+ }
+
+ protected void commitLoadPlans(TaskAttemptContext context) {
+ loadPlanFutures.stream().forEach(f -> {
+ try {
+ f.get();
+ } catch (Exception e) {
+ log.error("Load planning failed", e);
+ throw new RuntimeException(e);
+ }
+ });
+ writeLoadPlans(context);
+ }
+
+ private void writeLoadPlans(TaskAttemptContext context) {
+ log.debug("Writing bulk load plans to disk for all tables");
+ // Consolidate all plans for a table into a single file
+ List> futures = new LinkedList<>();
+ for (Map.Entry> entry : loadPlans.entrySet()) {
+ var builder = LoadPlan.builder();
+ entry.getValue().stream().forEach(plan -> builder.addPlan(plan));
+ var table = entry.getKey();
+ var path = new Path(String.format("%s/%s", workDir, table), getUniqueFile(context, "loadplan", ".json"));
+ var loadPlan = builder.build();
+ // TODO: Use Gson streaming api instead (JsonWriter) to minimize impact on heap
+ //@formatter:off
+ futures.add(CompletableFuture.runAsync(() -> {
+ try (FSDataOutputStream out = fs.create(path)) {
+ log.debug("Begin writing load plan for " + path);
+ out.write(gson.toJson(loadPlan).getBytes(StandardCharsets.UTF_8));
+ log.debug("Completed writing load plan for " + path);
+ } catch (IOException ioe) {
+ log.error("Failed to write plan for " + path, ioe);
+ throw new RuntimeException(ioe);
+ }
+ }));
+ //@formatter:on
+ }
+ futures.stream().forEach(f -> {
+ try {
+ f.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ log.debug("Finished writing bulk load plans to disk");
+ }
+
+ /**
+ * Creates a {@link LoadPlan} for the given RFile by mapping its row values to the relevant KeyExtents from the given table.
+ *
+ * @param rfile
+ * RFile path
+ * @param rfileRows
+ * Set of rows contained in the RFile
+ * @param tableName
+ * Table whose splits are to be examined to create the mapping
+ * @param conf
+ * The configuration required to retrieve table splits
+ * @return LoadPlan for the RFile
+ */
+ static LoadPlan compute(Path rfile, SortedSet rfileRows, String tableName, Configuration conf) {
+ if (rfileRows != null && !rfileRows.isEmpty()) {
+ try {
+ var splits = SplitsFile.getSplits(conf, tableName);
+ return compute(rfile, rfileRows, splits);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve splits!", e);
}
}
+ return null;
+ }
+
+ /**
+ * Creates a {@link LoadPlan} for the given RFile by mapping its row values to the relevant KeyExtents from the given table.
+ *
+ * @param rfile
+ * RFile path
+ * @param rfileRows
+ * Set of rows contained in the RFile
+ * @param tableSplits
+ * Splits for the table being targeted
+ * @return LoadPlan for the RFile.
+ */
+ static LoadPlan compute(Path rfile, SortedSet rfileRows, List tableSplits) {
+ // @formatter:off
+ var builder = LoadPlan.builder();
+ if (tableSplits == null || tableSplits.isEmpty()) {
+ // RFile examination and mapping handled later by bulk import client
+ log.debug("Calculating FILE load plan for " + rfile);
+ var ke = new KeyExtent(rfileRows.first(), rfileRows.last());
+ builder.addPlan(LoadPlan.builder().loadFileTo(
+ rfile.getName(), RangeType.FILE, ke.prevEndRow, ke.endRow).build());
+ } else {
+ // Compute extent mapping so that we can skip examination at load time
+ log.debug("Calculating TABLE load plan for " + rfile);
+ rfileRows.stream()
+ .map(row -> findKeyExtent(row, tableSplits))
+ .collect(Collectors.toCollection(HashSet::new))
+ .forEach(ke -> builder.addPlan(LoadPlan.builder().loadFileTo(
+ rfile.getName(), RangeType.TABLE, ke.prevEndRow, ke.endRow).build())
+ );
+ log.debug("Table load plan completed for file: " + rfile);
+ }
+ // @formatter:on
+ return builder.build();
+ }
+
+ /**
+ * Finds the KeyExtent where the specified row should reside
+ *
+ * @param lookupRow
+ * Row value to be mapped
+ * @param tableSplits
+ * Splits for the table in question
+ * @return KeyExtent mapping for the given row
+ */
+ static KeyExtent findKeyExtent(Text lookupRow, List tableSplits) {
+ var ke = new KeyExtent();
+ var ceilingIdx = findCeiling(lookupRow, tableSplits);
+ ke.prevEndRow = ceilingIdx > 0 ? tableSplits.get(ceilingIdx - 1) : null;
+ ke.endRow = ceilingIdx < tableSplits.size() ? tableSplits.get(ceilingIdx) : null;
+ return ke;
+ }
+
+ /**
+ * Performs binary search on tableSplits to find the index of the first (least) split ≥ lookupRow
+ *
+ * @param lookupRow
+ * Row for which we want to find the ceiling
+ * @param tableSplits
+ * Sorted table splits list whose implementation is assumed to provide fast random access (i.e., {@link java.util.RandomAccess})
+ * @return the index of lookupRow, if found. Otherwise, returns lookupRow's 'insertion point' as defined by {@link Collections#binarySearch(List, Object)}
+ */
+ static int findCeiling(Text lookupRow, List tableSplits) {
+ var idx = Collections.binarySearch(tableSplits, lookupRow);
+ return idx >= 0 ? idx : -(idx + 1);
}
public static class SizeTrackingWriter implements FileSKVWriter {
private FileSKVWriter delegate;
+ private boolean loadPlanning;
+ SortedSet rows;
long size = 0;
int entries = 0;
@@ -310,6 +508,9 @@ public void append(Key key, Value value) throws IOException {
entries++;
size += key.getLength() + (value == null ? 0 : value.getSize());
delegate.append(key, value);
+ if (loadPlanning) {
+ rows.add(key.getRow());
+ }
}
public DataOutputStream createMetaStore(String name) throws IOException {
@@ -325,8 +526,16 @@ public long getLength() throws IOException {
return getSize();
}
- public SizeTrackingWriter(FileSKVWriter delegate) {
+ public boolean isLoadPlanning() {
+ return loadPlanning;
+ }
+
+ public SizeTrackingWriter(FileSKVWriter delegate, boolean loadPlanning) {
this.delegate = delegate;
+ this.loadPlanning = loadPlanning;
+ if (this.loadPlanning) {
+ rows = new TreeSet<>();
+ }
}
}
@@ -459,6 +668,12 @@ public RecordWriter getRecordWriter(final TaskAttemptContex
columnFamilyToLocalityGroup = Maps.newHashMap();
localityGroupToColumnFamilies = Maps.newHashMap();
+ loadPlanningEnabled = loadPlanningEnabled(conf);
+ if (loadPlanningEnabled) {
+ loadPlans = new ConcurrentHashMap<>();
+ loadPlanFutures = new LinkedList<>();
+ }
+
extension = conf.get(FILE_TYPE);
if (extension == null || extension.isEmpty())
extension = RFile.EXTENSION;
@@ -557,15 +772,24 @@ public void write(BulkIngestKey key, Value value) throws IOException {
}
}
writer.append(key.getKey(), value);
-
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- // Close all of the Map File Writers
- for (SizeTrackingWriter writer : writers.values()) {
+ for (Map.Entry entry : writers.entrySet()) {
+ var writer = entry.getValue();
+ if (writer.isLoadPlanning()) {
+ var tableName = writerTableNames.get(entry.getKey());
+ var rfilePath = usedWriterPaths.get(entry.getKey());
+ computeLoadPlan(writer, tableName, rfilePath);
+ }
writer.close();
}
+
+ if (loadPlanningEnabled) {
+ commitLoadPlans(context);
+ }
+
// To verify the file was actually written successfully, we need to reopen it which will reread
// the index at the end and verify its integrity.
FileOperations fops = FileOperations.getInstance();
@@ -626,6 +850,9 @@ private SizeTrackingWriter getOrCreateWriter(TaskAttemptContext context, String
if (shardLocation == null) {
// in this case we have a shard id that has no split. Lets put this in one "extra" file
shardLocation = "extra";
+ } else {
+ // Ensure there's no colon
+ shardLocation = shardLocation.replace(":", "_");
}
}
// Combine table name with shard location so that we end up
@@ -675,4 +902,56 @@ protected Map getShardLocations(String tableName) throws IOExceptio
return tableShardLocations.get(tableName);
}
+
+ /**
+ * Simplified representation of an Accumulo KeyExtent, used here to track mapped tablets during LoadPlan creation (and to avoid yet another Accumulo
+ * "disallowed import")
+ */
+ static class KeyExtent implements Comparable {
+ Text prevEndRow = null;
+ Text endRow = null;
+
+ KeyExtent(Text prevEndRow, Text endRow) {
+ this.prevEndRow = prevEndRow;
+ this.endRow = endRow;
+ }
+
+ KeyExtent() {}
+
+ private static final Comparator COMPARATOR = Comparator.comparing(KeyExtent::endRow, Comparator.nullsLast(Text::compareTo))
+ .thenComparing(KeyExtent::prevEndRow, Comparator.nullsFirst(Text::compareTo));
+
+ public Text endRow() {
+ return endRow;
+ }
+
+ public Text prevEndRow() {
+ return prevEndRow;
+ }
+
+ @Override
+ public int compareTo(KeyExtent other) {
+ return COMPARATOR.compare(this, other);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ KeyExtent keyExtent = (KeyExtent) o;
+ return Objects.equals(endRow, keyExtent.endRow) && Objects.equals(prevEndRow, keyExtent.prevEndRow);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(endRow, prevEndRow);
+ }
+
+ @Override
+ public String toString() {
+ return (prevEndRow == null ? "null" : prevEndRow.toString()) + ";" + (endRow == null ? "null" : endRow.toString());
+ }
+ }
}
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoaderTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoaderTest.java
index 916d87f7a5d..7498bd25d65 100644
--- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoaderTest.java
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoaderTest.java
@@ -1,5 +1,9 @@
package datawave.ingest.mapreduce.job;
+import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.COMPLETE_FILE_MARKER;
+import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.FAILED_FILE_MARKER;
+import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.INPUT_FILES_MARKER;
+
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -11,12 +15,27 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -32,12 +51,16 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
import org.powermock.api.easymock.PowerMock;
import org.powermock.reflect.Whitebox;
@@ -51,18 +74,37 @@
import datawave.ingest.data.config.ingest.BaseIngestHelper;
import datawave.ingest.input.reader.EventRecordReader;
import datawave.ingest.input.reader.LongLineEventRecordReader;
+import datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.ImportMode;
@Category(IntegrationTest.class)
public class BulkIngestMapFileLoaderTest {
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
protected static final URI FILE_SYSTEM_URI = URI.create("file:///");
protected static final Logger logger = Logger.getLogger(BulkIngestMapFileLoaderTest.class);
- protected Level testDriverLevel;
- private List systemProperties;
+ private static final String PASSWORD = "secret";
- private Configuration conf = new Configuration();
+ private static final String USER = "root";
+
+ private static final Authorizations USER_AUTHS = new Authorizations("BAR", "FOO", "PRIVATE", "PUBLIC");
+
+ private static final String METADATA_TABLE = "metadata";
+ private static final String METADATA_RFILE_PATH = "/datawave/rfiles/metadata/I3abcdef01.rf";
+
+ private static final String SHARD_TABLE = "shard";
+ private static final String SHARD_RFILE_PATH = "/datawave/rfiles/shard/I2abcdef01.rf";
+
+ private static MiniAccumuloCluster cluster;
+ private static File tmpDir;
+ private static java.nio.file.Path workPath;
+ private static java.nio.file.Path flaggedPath;
+ private static java.nio.file.Path loadedPath;
+ private static URI metadataRfile;
+ private static URI shardRfile;
@Rule
public final ExpectedSystemExit exit = ExpectedSystemExit.none();
@@ -70,6 +112,141 @@ public class BulkIngestMapFileLoaderTest {
@Rule
public TestLogCollector logCollector = new TestLogCollector.Builder().with(BulkIngestMapFileLoader.class, Level.ALL).build();
+ protected Level testDriverLevel;
+
+ private List systemProperties;
+ private Configuration conf = new Configuration();
+
+ @BeforeClass
+ public static void setupClass() throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException, IOException,
+ InterruptedException, URISyntaxException {
+ tmpDir = temporaryFolder.newFolder();
+ cluster = new MiniAccumuloCluster(tmpDir, PASSWORD);
+ cluster.start();
+
+ workPath = Paths.get(tmpDir.getAbsolutePath(), "datawave", "ingest", "work");
+ Files.createDirectories(workPath);
+
+ flaggedPath = Files.createDirectory(Paths.get(workPath.toString(), "flagged"));
+ loadedPath = Files.createDirectory(Paths.get(workPath.toString(), "loaded"));
+
+ metadataRfile = BulkIngestMapFileLoaderTest.class.getResource(METADATA_RFILE_PATH).toURI();
+ shardRfile = BulkIngestMapFileLoaderTest.class.getResource(SHARD_RFILE_PATH).toURI();
+
+ try (AccumuloClient client = cluster.createAccumuloClient(USER, new PasswordToken(PASSWORD))) {
+ if (!client.tableOperations().exists(METADATA_TABLE)) {
+ client.tableOperations().create(METADATA_TABLE);
+ }
+ if (!client.tableOperations().exists(SHARD_TABLE)) {
+ client.tableOperations().create(SHARD_TABLE);
+ }
+ client.securityOperations().changeUserAuthorizations(USER, USER_AUTHS);
+ }
+ }
+
+ /**
+ * Sets up all inputs required to process a completed ingest job (job.complete) against the running MAC
+ *
+ * @param jobName
+ * should uniquely identify the bulk load job to be run
+ * @param loaderSleepTime
+ * desired sleep time (in ms) for the bulk loader
+ *
+ * @return BulkIngestMapFileLoader instance for running the job
+ * @throws IOException
+ */
+ private BulkIngestMapFileLoader setupJobComplete(String jobName, int loaderSleepTime) throws IOException {
+
+ Assert.assertFalse("jobName can't be null/empty", jobName == null || jobName.isEmpty());
+
+ java.nio.file.Path metaSrc, metaDest, shardSrc, shardDest, inputFilesPath, inputFile, jobPathsFile;
+
+ String parentDir = workPath.toString();
+ String mapFilesDir = "mapFiles";
+
+ Assert.assertFalse(jobName + " directory already exists", Files.exists(Paths.get(parentDir, jobName)));
+ Assert.assertFalse(jobName + " flagged directory already exists", Files.exists(Paths.get(flaggedPath.toString(), jobName)));
+ Assert.assertFalse(jobName + " loaded directory already exists", Files.exists(Paths.get(loadedPath.toString(), jobName)));
+
+ // Copy metadata rfile into jobName/mapFiles/DW_METADATA_TABLE dir
+ metaSrc = Paths.get(metadataRfile);
+ metaDest = Files.createDirectories(Paths.get(parentDir, jobName, mapFilesDir, METADATA_TABLE));
+ Files.copy(metaSrc, Paths.get(metaDest.toString(), metaSrc.getFileName().toString()));
+
+ // Copy shard rfile into jobName/mapFiles/DW_SHARD_TABLE dir
+ shardSrc = Paths.get(shardRfile);
+ shardDest = Files.createDirectories(Paths.get(parentDir, jobName, mapFilesDir, SHARD_TABLE));
+ Files.copy(shardSrc, Paths.get(shardDest.toString(), shardSrc.getFileName().toString()));
+
+ // Create 'job.paths' marker and associated dummy input file...
+ inputFilesPath = Files.createDirectory(Paths.get(flaggedPath.toString(), jobName));
+ inputFile = Files.createFile(Paths.get(inputFilesPath.toString(), "dummy"));
+ jobPathsFile = Files.createFile(Paths.get(parentDir, jobName, INPUT_FILES_MARKER));
+ Files.write(jobPathsFile, inputFile.toString().getBytes(StandardCharsets.UTF_8));
+
+ // Create 'job.complete' marker
+ Files.createFile(Paths.get(parentDir, jobName, COMPLETE_FILE_MARKER));
+
+ // @formatter:off
+ return new BulkIngestMapFileLoader(
+ workPath.toString(),
+ "*",
+ cluster.getInstanceName(),
+ cluster.getZooKeepers(),
+ USER,
+ new PasswordToken(PASSWORD),
+ tmpDir.toURI(),
+ tmpDir.toURI(),
+ tmpDir.toURI(),
+ null,
+ new HashMap<>(),
+ conf,
+ 0,
+ 1,
+ new ArrayList<>(),
+ loaderSleepTime,
+ loaderSleepTime,
+ false,
+ ImportMode.V2_LOCAL_MAPPING);
+ // @formatter:on
+ }
+
+ private void verifyImportedData() throws TableNotFoundException {
+
+ long shardKeyCount = 0;
+ long metaKeyCount = 0;
+
+ Collection ranges = Collections.singleton(new Range());
+ try (AccumuloClient client = cluster.createAccumuloClient(USER, new PasswordToken(PASSWORD))) {
+ // Count shard keys
+ BatchScanner scanner = client.createBatchScanner(SHARD_TABLE, USER_AUTHS);
+ scanner.setRanges(ranges);
+ Iterator it = scanner.iterator();
+ while (it.hasNext()) {
+ it.next();
+ shardKeyCount++;
+ }
+ scanner.close();
+
+ // Count metadata keys
+ scanner = client.createBatchScanner(METADATA_TABLE, USER_AUTHS);
+ scanner.setRanges(ranges);
+ it = scanner.iterator();
+ while (it.hasNext()) {
+ it.next();
+ metaKeyCount++;
+ }
+ scanner.close();
+ }
+ Assert.assertEquals("Unexpected number of shard entries", 16301, shardKeyCount);
+ Assert.assertEquals("Unexpected number of metadata entries", 380, metaKeyCount);
+ }
+
+ @AfterClass
+ public static void teardownClass() throws IOException {
+ cluster.close();
+ }
+
@Test
public void testShutdownPortAlreadyInUse() throws IOException {
exit.expectSystemExitWithStatus(-3);
@@ -540,6 +717,108 @@ public void testMainWithAllOptionalArgs() throws IOException, InterruptedExcepti
}
}
+ /**
+ * Use MAC to verify that bulk loader actually loads rfiles into shard, metadata tables successfully
+ */
+ @Test
+ public void testLoaderWithMiniAccumuloCluster() {
+ BulkIngestMapFileLoaderTest.logger.info("testLoaderWithMiniAccumuloCluster called...");
+
+ List log = logCollector.getMessages();
+ Assert.assertTrue("Unexpected log messages", log.isEmpty());
+
+ BulkIngestMapFileLoader processor = null;
+ try {
+ processor = setupJobComplete("job1", 1000);
+ new Thread(processor, "map-file-watcher").start();
+
+ // Wait up to 30 secs for the bulk loader to log completion
+ for (int i = 1; i <= 15; i++) {
+ Thread.sleep(2000);
+ if (log.contains("Marking 1 sequence files from flagged to loaded")) {
+ break;
+ }
+ }
+
+ Assert.assertTrue("Unexpected log output", log.contains("Bringing Map Files online for " + METADATA_TABLE));
+ Assert.assertTrue("Unexpected log output", log.contains("Bringing Map Files online for " + SHARD_TABLE));
+ Assert.assertTrue("Unexpected log output", log.contains("Completed bringing map files online for " + METADATA_TABLE));
+ Assert.assertTrue("Unexpected log output", log.contains("Completed bringing map files online for " + SHARD_TABLE));
+ Assert.assertTrue("Unexpected log output", log.contains("Marking 1 sequence files from flagged to loaded"));
+
+ verifyImportedData();
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ BulkIngestMapFileLoaderTest.logger.info("testLoaderWithMiniAccumuloCluster completed.");
+ if (processor != null) {
+ processor.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Use MAC to verify that bulk loader fails as expected in the face of invalid rfile(s)
+ */
+ @Test
+ public void testLoadFailedWithMiniAccumuloCluster() {
+ BulkIngestMapFileLoaderTest.logger.info("testLoadFailedWithMiniAccumuloCluster called...");
+
+ List log = logCollector.getMessages();
+ Assert.assertTrue("Unexpected log messages", log.isEmpty());
+
+ String jobName = "job2";
+
+ java.nio.file.Path metaRfile, failedMarker;
+
+ // expected marker file
+ failedMarker = Paths.get(workPath.toString(), jobName, FAILED_FILE_MARKER);
+
+ BulkIngestMapFileLoader processor = null;
+ try {
+ // Create/configure 'job2'
+ processor = setupJobComplete(jobName, 500);
+
+ // rfile to corrupt...
+ metaRfile = Paths.get(workPath.toString(), jobName, "mapFiles", METADATA_TABLE, "I3abcdef01.rf");
+
+ Assert.assertTrue("metadata rfile is missing after setup", Files.exists(metaRfile));
+
+ // Write invalid content...
+ Files.delete(metaRfile);
+ Files.createFile(metaRfile);
+ Files.write(metaRfile, "Invalid rfile content here".getBytes(StandardCharsets.UTF_8));
+
+ String expectedMsg = "Error importing files into table " + METADATA_TABLE + " from directory file:"
+ + Paths.get(workPath.toString(), jobName, "mapFiles");
+
+ // Start the loader
+ new Thread(processor, "map-file-watcher").start();
+
+ // Wait up to 30 secs for the bulk loader to log the failure
+ for (int i = 1; i <= 10; i++) {
+ Thread.sleep(3000);
+ if (log.contains(expectedMsg)) {
+ break;
+ }
+ }
+
+ Assert.assertTrue("Unexpected log output", log.contains("Bringing Map Files online for " + METADATA_TABLE));
+ Assert.assertTrue("Unexpected log output", log.contains(expectedMsg));
+ Assert.assertTrue("Bad metadata rfile should have remained in the job dir: " + metaRfile, Files.exists(metaRfile));
+ Assert.assertTrue("Missing 'job.failed' marker after failed import", Files.exists(failedMarker));
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ BulkIngestMapFileLoaderTest.logger.info("testLoadFailedWithMiniAccumuloCluster completed.");
+ if (processor != null) {
+ processor.shutdown();
+ }
+ }
+ }
+
@Test
public void testMainWithAllOptionalArgsNoTablePriorites() throws IOException, InterruptedException {
@@ -1909,7 +2188,7 @@ public void testTakeOwnershipJobDirectoryHappyPath() throws Exception {
String filePath = String.format("%s%s", url.toString(), BulkIngestMapFileLoader.LOADING_FILE_MARKER);
exists.put(filePath, Boolean.TRUE);
- filePath = String.format("%s%s", url.toString(), BulkIngestMapFileLoader.COMPLETE_FILE_MARKER);
+ filePath = String.format("%s%s", url.toString(), COMPLETE_FILE_MARKER);
exists.put(filePath, Boolean.FALSE);
BulkIngestMapFileLoaderTest.WrappedLocalFileSystem fs = new BulkIngestMapFileLoaderTest.WrappedLocalFileSystem(createMockInputStream(),
@@ -1957,7 +2236,7 @@ public void testTakeOwnershipJobDirectoryFailedOwnershipExchangeLoading() throws
Map existsResults = new HashMap<>();
String filePath = String.format("%s%s", url, BulkIngestMapFileLoader.LOADING_FILE_MARKER);
existsResults.put(filePath, Boolean.FALSE);
- filePath = String.format("%s%s", url, BulkIngestMapFileLoader.COMPLETE_FILE_MARKER);
+ filePath = String.format("%s%s", url, COMPLETE_FILE_MARKER);
existsResults.put(filePath, Boolean.TRUE);
BulkIngestMapFileLoaderTest.WrappedLocalFileSystem fs = new BulkIngestMapFileLoaderTest.WrappedLocalFileSystem(createMockInputStream(),
@@ -2008,7 +2287,7 @@ public void testTakeOwnershipJobDirectoryFailedOwnershipExchangeComplete() throw
Map existsResults = new HashMap<>();
String filePath = String.format("%s%s", url, BulkIngestMapFileLoader.LOADING_FILE_MARKER);
existsResults.put(filePath, Boolean.TRUE);
- filePath = String.format("%s%s", url, BulkIngestMapFileLoader.COMPLETE_FILE_MARKER);
+ filePath = String.format("%s%s", url, COMPLETE_FILE_MARKER);
existsResults.put(filePath, Boolean.TRUE);
BulkIngestMapFileLoaderTest.WrappedLocalFileSystem fs = new BulkIngestMapFileLoaderTest.WrappedLocalFileSystem(createMockInputStream(),
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java
index 09c31d5e690..be7ea9df7c4 100644
--- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java
@@ -1,5 +1,9 @@
package datawave.ingest.mapreduce.job;
+import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.BULK_IMPORT_MODE_CONFIG;
+import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findKeyExtent;
+import static org.junit.Assert.assertEquals;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -10,12 +14,16 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.commons.codec.binary.Base64;
@@ -40,6 +48,8 @@
import org.powermock.api.easymock.PowerMock;
import datawave.ingest.data.config.ingest.AccumuloHelper;
+import datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.ImportMode;
+import datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.KeyExtent;
import datawave.util.TableName;
public class MultiRFileOutputFormatterTest {
@@ -141,7 +151,6 @@ public void teardown() {
MultiRFileOutputFormatterTest.logger.setLevel(testDriverLevel);
Logger.getLogger(MultiRFileOutputFormatter.class).setLevel(uutLevel);
-
}
@Test(expected = IllegalArgumentException.class)
@@ -162,6 +171,87 @@ public void testSetCompressionTypeWithBadType() {
}
}
+ private Map> tableLoadPlans = new HashMap<>();
+
+ private ArrayList getSplits() {
+ var arr = new ArrayList();
+ arr.add(new Text("20170601_0")); // 0
+ arr.add(new Text("20170601_1")); // 1
+ arr.add(new Text("20170601_2")); // 2
+ arr.add(new Text("20170601_3")); // 3
+ arr.add(new Text("20170601_4")); // 4
+ arr.add(new Text("20170601_5")); // 5
+ arr.add(new Text("20170601_6")); // 6
+ arr.add(new Text("20170601_7")); // 7
+ arr.add(new Text("20170601_8")); // 8
+ arr.add(new Text("20170601_9")); // 9
+ arr.add(new Text("20170602_0")); // 10
+ arr.add(new Text("20170602_1")); // 11
+ arr.add(new Text("20170602_2")); // 12
+ arr.add(new Text("20170602_3")); // 13
+ arr.add(new Text("20170602_4")); // 14
+ arr.add(new Text("20170602_5")); // 15
+ arr.add(new Text("20170602_6")); // 16
+ arr.add(new Text("20170602_7")); // 17
+ arr.add(new Text("20170602_8")); // 18
+ arr.add(new Text("20170602_9")); // 19
+ arr.add(new Text("20170602_9a")); // 20
+ arr.add(new Text("20170602_9b")); // 21
+ arr.add(new Text("20170602_9c")); // 22
+ arr.add(new Text("20170603_0")); // 23
+ arr.add(new Text("20170603_0a")); // 24
+ arr.add(new Text("20170603_0b")); // 25
+ arr.add(new Text("20170603_0c")); // 26
+ arr.add(new Text("20170603_1")); // 27
+ arr.add(new Text("20170603_2")); // 28
+ arr.add(new Text("20170603_3")); // 29
+ arr.add(new Text("20170603_4")); // 30
+ arr.add(new Text("20170603_5")); // 31
+ arr.add(new Text("20170603_6")); // 32
+ arr.add(new Text("20170603_7")); // 34
+ arr.add(new Text("20170603_8")); // 35
+ arr.add(new Text("20170603_9")); // 36
+ return arr;
+ }
+
+ @Test
+ public void testPlanning() {
+ SortedSet rfileRows = new TreeSet<>();
+ rfileRows.add(new Text("20160602_0"));
+ rfileRows.add(new Text("20170601_0"));
+ rfileRows.add(new Text("20170601_1"));
+ rfileRows.add(new Text("20170602_1"));
+ rfileRows.add(new Text("20170602_0a1"));
+ rfileRows.add(new Text("20170602_0a11"));
+ rfileRows.add(new Text("20170602_0a111"));
+ rfileRows.add(new Text("20170602_0b1"));
+ rfileRows.add(new Text("20170602_0c1"));
+ rfileRows.add(new Text("20170603_0"));
+ rfileRows.add(new Text("20170603_0a11"));
+ rfileRows.add(new Text("20170603_0a12"));
+ rfileRows.add(new Text("20170603_0b"));
+ rfileRows.add(new Text("20170603_0c"));
+ rfileRows.add(new Text("20170603_0d"));
+ rfileRows.add(new Text("20170601_9"));
+ rfileRows.add(new Text("20200601_9"));
+
+ Set expectedExtents = new HashSet<>();
+ expectedExtents.add(new KeyExtent(new Text("20170601_0"), new Text("20170601_1")));
+ expectedExtents.add(new KeyExtent(new Text("20170601_8"), new Text("20170601_9")));
+ expectedExtents.add(new KeyExtent(new Text("20170602_0"), new Text("20170602_1")));
+ expectedExtents.add(new KeyExtent(new Text("20170603_9"), null));
+ expectedExtents.add(new KeyExtent(new Text("20170603_0c"), new Text("20170603_1")));
+ expectedExtents.add(new KeyExtent(null, new Text("20170601_0")));
+ expectedExtents.add(new KeyExtent(new Text("20170602_9c"), new Text("20170603_0")));
+ expectedExtents.add(new KeyExtent(new Text("20170603_0a"), new Text("20170603_0b")));
+ expectedExtents.add(new KeyExtent(new Text("20170603_0b"), new Text("20170603_0c")));
+
+ List tableSplits = getSplits();
+ Set extents = rfileRows.stream().map(row -> findKeyExtent(row, tableSplits)).collect(Collectors.toCollection(HashSet::new));
+
+ assertEquals(expectedExtents, extents);
+ }
+
@Test
public void testSetCompressionType() {
@@ -179,25 +269,25 @@ public void testSetCompressionType() {
String expected = "snappy";
MultiRFileOutputFormatter.setCompressionType(createMockConfiguration(), expected);
- Assert.assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
+ assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
MultiRFileOutputFormatterTest.mockedConfiguration.get(compressionKey));
expected = "lzo";
MultiRFileOutputFormatter.setCompressionType(createMockConfiguration(), expected);
- Assert.assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
+ assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
MultiRFileOutputFormatterTest.mockedConfiguration.get(compressionKey));
expected = "gz";
MultiRFileOutputFormatter.setCompressionType(createMockConfiguration(), expected);
- Assert.assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
+ assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
MultiRFileOutputFormatterTest.mockedConfiguration.get(compressionKey));
expected = "none";
MultiRFileOutputFormatter.setCompressionType(createMockConfiguration(), expected);
- Assert.assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
+ assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
MultiRFileOutputFormatterTest.mockedConfiguration.get(compressionKey));
} finally {
@@ -216,7 +306,7 @@ public void testGetCompressionTypeWithoutSetting() {
String expected = "gz";
- Assert.assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
+ assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
MultiRFileOutputFormatter.getCompressionType(createMockConfiguration()));
} finally {
@@ -237,25 +327,25 @@ public void testGetCompressionType() {
String expected = "snappy";
MultiRFileOutputFormatter.setCompressionType(conf, expected);
- Assert.assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
+ assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
MultiRFileOutputFormatter.getCompressionType(conf));
expected = "lzo";
MultiRFileOutputFormatter.setCompressionType(conf, expected);
- Assert.assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
+ assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
MultiRFileOutputFormatter.getCompressionType(conf));
expected = "gz";
MultiRFileOutputFormatter.setCompressionType(conf, expected);
- Assert.assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
+ assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
MultiRFileOutputFormatter.getCompressionType(conf));
expected = "none";
MultiRFileOutputFormatter.setCompressionType(conf, expected);
- Assert.assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
+ assertEquals("MultiRFileOutputFormatter.setCompressionType failed to set compression type", expected,
MultiRFileOutputFormatter.getCompressionType(conf));
} finally {
@@ -287,7 +377,7 @@ public void testSetFileType() {
Assert.assertTrue("MultiRFileOutputFormatter#setFileType failed to add a File Type property",
MultiRFileOutputFormatterTest.mockedConfiguration.containsKey(typeKey));
- Assert.assertEquals("MultiRFileOutputFormatter#setFileType failed to retain the expected File Type property value", "fileType",
+ assertEquals("MultiRFileOutputFormatter#setFileType failed to retain the expected File Type property value", "fileType",
MultiRFileOutputFormatterTest.mockedConfiguration.get(typeKey));
} finally {
@@ -323,13 +413,11 @@ public void testSetAccumuloConfiguration() {
String passwordKey = String.format("%s.password", MultiRFileOutputFormatter.class.getName());
String zooKeeperKey = String.format("%s.zookeepers", MultiRFileOutputFormatter.class.getName());
- Assert.assertEquals("MultiRFileOutputFormatter#setAccumuloConfiguration failed to retain the expected instance value.", instance,
- conf.get(instanceKey));
- Assert.assertEquals("MultiRFileOutputFormatter#setAccumuloConfiguration failed to retain the expected username value.", username,
- conf.get(usernameKey));
- Assert.assertEquals("MultiRFileOutputFormatter#setAccumuloConfiguration failed to retain the expected password value.",
+ assertEquals("MultiRFileOutputFormatter#setAccumuloConfiguration failed to retain the expected instance value.", instance, conf.get(instanceKey));
+ assertEquals("MultiRFileOutputFormatter#setAccumuloConfiguration failed to retain the expected username value.", username, conf.get(usernameKey));
+ assertEquals("MultiRFileOutputFormatter#setAccumuloConfiguration failed to retain the expected password value.",
new String(Base64.encodeBase64(password)), conf.get(passwordKey));
- Assert.assertEquals("MultiRFileOutputFormatter#setAccumuloConfiguration failed to retain the expected zookeepers value.", zookeepers,
+ assertEquals("MultiRFileOutputFormatter#setAccumuloConfiguration failed to retain the expected zookeepers value.", zookeepers,
conf.get(zooKeeperKey));
} finally {
@@ -421,8 +509,7 @@ public long getLength() throws IOException {
@Override
public void append(Key key, Value value) throws IOException {}
- });
-
+ }, false);
}
};
@@ -434,7 +521,7 @@ public void before() {
conf = new Configuration();
conf.set("mapred.output.dir", "/tmp");
conf.set(SplitsFile.CONFIGURED_SHARDED_TABLE_NAMES, TableName.SHARD);
-
+ conf.setEnum(BULK_IMPORT_MODE_CONFIG, ImportMode.V2_LOAD_PLANNING);
}
@Test
@@ -490,8 +577,10 @@ public void testRFileFileSizeLimitWithFilePerShardLoc() throws IOException, Inte
public void testRFileFileSizeLimit() throws IOException, InterruptedException {
// each key we write is 16 characters total, so a limit of 32 should allow two keys per file
MultiRFileOutputFormatter.setRFileLimits(conf, 0, 32);
- RecordWriter writer = createWriter(formatter, conf);
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID(new TaskID(new JobID(JOB_ID, 1), TaskType.MAP, 1), 1));
+ RecordWriter writer = createWriter(formatter, context);
writeShardPairs(writer, 3);
+ // writer.close(context);
assertNumFileNames(4);
assertFileNameForShardIndex(0);
expectShardFiles(3);
@@ -528,6 +617,11 @@ private RecordWriter createWriter(MultiRFileOutputFormatter
return formatter.getRecordWriter(context);
}
+ private RecordWriter createWriter(MultiRFileOutputFormatter formatter, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return formatter.getRecordWriter(context);
+ }
+
private void assertFileNameForShard(int index, String prefix, int shardId) {
Assert.assertTrue(filenames.get(index).endsWith("/shard/" + prefix + "-m-00001_" + shardId + ".rf"));
}
@@ -537,6 +631,6 @@ private void writeShardEntry(RecordWriter writer, int shard
}
private void assertNumFileNames(int expectedNumFiles) {
- Assert.assertEquals(filenames.toString(), expectedNumFiles, filenames.size());
+ assertEquals(filenames.toString(), expectedNumFiles, filenames.size());
}
}
diff --git a/warehouse/ingest-core/src/test/resources/datawave/rfiles/metadata/I3abcdef01.rf b/warehouse/ingest-core/src/test/resources/datawave/rfiles/metadata/I3abcdef01.rf
new file mode 100644
index 00000000000..577df6b96ce
Binary files /dev/null and b/warehouse/ingest-core/src/test/resources/datawave/rfiles/metadata/I3abcdef01.rf differ
diff --git a/warehouse/ingest-core/src/test/resources/datawave/rfiles/shard/I2abcdef01.rf b/warehouse/ingest-core/src/test/resources/datawave/rfiles/shard/I2abcdef01.rf
new file mode 100644
index 00000000000..886bdd374d5
Binary files /dev/null and b/warehouse/ingest-core/src/test/resources/datawave/rfiles/shard/I2abcdef01.rf differ
diff --git a/warehouse/ingest-scripts/src/main/resources/bin/ingest/map-file-bulk-loader.sh b/warehouse/ingest-scripts/src/main/resources/bin/ingest/map-file-bulk-loader.sh
index 52e658a3c46..84aaa379755 100755
--- a/warehouse/ingest-scripts/src/main/resources/bin/ingest/map-file-bulk-loader.sh
+++ b/warehouse/ingest-scripts/src/main/resources/bin/ingest/map-file-bulk-loader.sh
@@ -29,6 +29,8 @@ done
WORKDIR=${BASE_WORK_DIR}/
PATTERN="'*'"
+CLASSPATH=$CLASSPATH:../../config/log4j-bulkloader.xml
+
if [[ -e "$INGEST_HADOOP_HOME/share/hadoop/tools/lib/hadoop-$HADOOP_VERSION-distcp.jar" ]]; then
# standard naming
export HADOOP_CLASSPATH=$CLASSPATH:$INGEST_HADOOP_HOME/share/hadoop/tools/lib/hadoop-$HADOOP_VERSION-distcp.jar
@@ -64,7 +66,7 @@ if [[ ! -z $JOB_OBSERVERS ]]; then
OBSERVER_OPTS="-jobObservers $JOB_OBSERVERS $JOB_OBSERVER_EXTRA_OPTS"
fi
-export HADOOP_OPTS=" ${HADOOP_INGEST_OPTS} -Dapp=bulkIngestMapFileLoader -DshutdownPort=$shutdownPort -Dfile.encoding=UTF8 -Duser.timezone=GMT"
+export HADOOP_OPTS=" ${HADOOP_INGEST_OPTS} -Dlog4j.configuration=log4j-bulkloader.xml -Dapp=bulkIngestMapFileLoader -DshutdownPort=$shutdownPort -Dfile.encoding=UTF8 -Duser.timezone=GMT"
$MAP_FILE_LOADER_COMMAND_PREFIX $INGEST_HADOOP_HOME/bin/hadoop --config $WAREHOUSE_HADOOP_CONF jar ${DATAWAVE_INGEST_CORE_JAR} datawave.ingest.mapreduce.job.BulkIngestMapFileLoader $WORKDIR $PATTERN $WAREHOUSE_INSTANCE_NAME $WAREHOUSE_ZOOKEEPERS $USERNAME $PASSWORD -majcThreshold ${MAP_LOADER_MAJC_THRESHOLD} -sleepTime 5000 -numHdfsThreads 100 -numThreads 20 -majcCheckInterval 1 -maxDirectories 200 -numAssignThreads 60 -seqFileHdfs $INGEST_HDFS_NAME_NODE -srcHdfs $WAREHOUSE_HDFS_NAME_NODE -destHdfs $WAREHOUSE_HDFS_NAME_NODE -jt $WAREHOUSE_JOBTRACKER_NODE $OBSERVER_OPTS $MAP_FILE_LOADER_EXTRA_ARGS $EXTRA_ARGS ${INGEST_CONFIG[@]}
RETURN_CODE=$?