Skip to content

Commit

Permalink
WIP: Bulk V2
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-ratcliffe committed Sep 17, 2024
1 parent dced884 commit 393627d
Show file tree
Hide file tree
Showing 14 changed files with 927 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ DW_DATAWAVE_INGEST_HOME="${DW_CLOUD_HOME}/${DW_DATAWAVE_INGEST_SYMLINK}"
# ingest reducers. Set to 1 for standalone instance, but typically set to the first prime number that is less than the
# number of available Accumulo tablet servers...

DW_DATAWAVE_INGEST_NUM_SHARDS=${DW_DATAWAVE_INGEST_NUM_SHARDS:-1}
DW_DATAWAVE_INGEST_NUM_SHARDS=${DW_DATAWAVE_INGEST_NUM_SHARDS:-10}

# Ingest job logs, etc

Expand Down Expand Up @@ -39,7 +39,7 @@ DW_DATAWAVE_INGEST_FLAGFILE_DIR="${DW_DATAWAVE_DATA_DIR}/flags"

# Comma-delimited list of configs for the FlagMaker process(es)

DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS=${DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS:-"${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-live.xml"}
DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS=${DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS:-"${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-live.xml,${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-bulk.xml"}

# Dir for ingest-related 'pid' files

Expand Down Expand Up @@ -72,7 +72,7 @@ DW_DATAWAVE_INGEST_LIVE_DATA_TYPES=${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES:-"wikipe

# Comma-delimited data type identifiers to be ingested via "bulk" ingest, ie via bulk import of RFiles into Accumulo tables

DW_DATAWAVE_INGEST_BULK_DATA_TYPES=${DW_DATAWAVE_INGEST_BULK_DATA_TYPES:-"shardStats"}
DW_DATAWAVE_INGEST_BULK_DATA_TYPES=${DW_DATAWAVE_INGEST_BULK_DATA_TYPES:-"shardStats,wikipedia,mycsv,myjson"}

DW_DATAWAVE_MAPRED_INGEST_OPTS=${DW_DATAWAVE_MAPRED_INGEST_OPTS:-"-useInlineCombiner -ingestMetricsDisabled"}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ tar xf "${DW_DATAWAVE_SERVICE_DIR}/${DW_DATAWAVE_INGEST_DIST}" -C "${TARBALL_BAS

info "DataWave Ingest tarball extracted and symlinked"

source "${THIS_DIR}/fix-hadoop-classpath.sh"

if ! hadoopIsRunning ; then
info "Starting Hadoop, so that we can initialize Accumulo"
hadoopStart
fi

# Create any Hadoop directories related to Datawave Ingest
# Create any Hadoop directories needed for live ingest input
if [[ -n "${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES}" ]] ; then

OLD_IFS="${IFS}"
Expand All @@ -44,10 +46,25 @@ if [[ -n "${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES}" ]] ; then
IFS="${OLD_IFS}"

for dir in "${HDFS_RAW_INPUT_DIRS[@]}" ; do
# Dirs created here should be configured in your live flag maker config (e.g., in config/flag-maker-live.xml)
hdfs dfs -mkdir -p "${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/${dir}" || fatal "Failed to create HDFS directory: ${dir}"
done
fi

# Create any Hadoop directories needed for bulk ingest input
if [[ -n "${DW_DATAWAVE_INGEST_BULK_DATA_TYPES}" ]] ; then

OLD_IFS="${IFS}"
IFS=","
HDFS_RAW_INPUT_DIRS=( ${DW_DATAWAVE_INGEST_BULK_DATA_TYPES} )
IFS="${OLD_IFS}"

for dir in "${HDFS_RAW_INPUT_DIRS[@]}" ; do
# Dirs created here should be configured in your bulk flag maker config (e.g., in config/flag-maker-bulk.xml)
hdfs dfs -mkdir -p "${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/${dir}-bulk" || fatal "Failed to create HDFS directory: ${dir}-bulk"
done
fi

#----------------------------------------------------------
# Configure/update Accumulo classpath, set auths, etc
#----------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion properties/dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ LIVE_CHILD_MAP_MAX_MEMORY_MB=1024
BULK_CHILD_REDUCE_MAX_MEMORY_MB=2048
LIVE_CHILD_REDUCE_MAX_MEMORY_MB=1024

BULK_INGEST_DATA_TYPES=shardStats
BULK_INGEST_DATA_TYPES=shardStats,wikipedia,mycsv,myjson
LIVE_INGEST_DATA_TYPES=wikipedia,mycsv,myjson

# Clear out these values if you do not want standard shard ingest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,24 @@
<name>partitioner.default.delegate</name>
<value>datawave.ingest.mapreduce.partition.MultiTableRRRangePartitioner</value>
</property>

<property>
<name>datawave.ingest.splits.cache.dir</name>
<value>${WAREHOUSE_HDFS_NAME_NODE}/data/splitsCache</value>
</property>

<property>
<name>accumulo.config.cache.path</name>
<value>${WAREHOUSE_HDFS_NAME_NODE}/data/accumuloConfigCache/accConfCache.txt</value>
</property>

<property>
<name>ingest.bulk.import.mode</name>
<value>V2_LOAD_PLANNING</value>
<description>
Must be one of [V1, V2_LOCAL_MAPPING, V2_LOAD_PLANNING]
(See BulkIngestMapFileLoader.ImportMode)
</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">

<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{ISO8601} %p [%c{1.}] [%t-%tid] %m%n"/>
</layout>
</appender>

<logger name="org.apache.hadoop">
<level value="info"/>
</logger>

<root>
<priority value="debug"/>
<appender-ref ref="CONSOLE"/>
</root>
</log4j:configuration>
5 changes: 5 additions & 0 deletions warehouse/ingest-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public Path createTempFile(FileSystem fs) throws IOException {
do {
Path parentDirectory = this.cacheFilePath.getParent();
String fileName = this.cacheFilePath.getName() + "." + count;
log.info("Attempting to create " + fileName + "under " + parentDirectory);
log.info("Attempting to create " + fileName + " under " + parentDirectory);
tmpCacheFile = new Path(parentDirectory, fileName);
count++;
} while (!fs.createNewFile(tmpCacheFile));
Expand Down
Loading

0 comments on commit 393627d

Please sign in to comment.