Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Bulk V2 #2568

Open
wants to merge 2 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ DW_DATAWAVE_INGEST_HOME="${DW_CLOUD_HOME}/${DW_DATAWAVE_INGEST_SYMLINK}"
# ingest reducers. Set to 1 for standalone instance, but typically set to the first prime number that is less than the
# number of available Accumulo tablet servers...

DW_DATAWAVE_INGEST_NUM_SHARDS=${DW_DATAWAVE_INGEST_NUM_SHARDS:-1}
DW_DATAWAVE_INGEST_NUM_SHARDS=${DW_DATAWAVE_INGEST_NUM_SHARDS:-10}

# Ingest job logs, etc

Expand Down Expand Up @@ -39,7 +39,7 @@ DW_DATAWAVE_INGEST_FLAGFILE_DIR="${DW_DATAWAVE_DATA_DIR}/flags"

# Comma-delimited list of configs for the FlagMaker process(es)

DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS=${DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS:-"${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-live.xml"}
DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS=${DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS:-"${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-live.xml,${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-bulk.xml"}

# Dir for ingest-related 'pid' files

Expand Down Expand Up @@ -72,7 +72,7 @@ DW_DATAWAVE_INGEST_LIVE_DATA_TYPES=${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES:-"wikipe

# Comma-delimited data type identifiers to be ingested via "bulk" ingest, ie via bulk import of RFiles into Accumulo tables

DW_DATAWAVE_INGEST_BULK_DATA_TYPES=${DW_DATAWAVE_INGEST_BULK_DATA_TYPES:-"shardStats"}
DW_DATAWAVE_INGEST_BULK_DATA_TYPES=${DW_DATAWAVE_INGEST_BULK_DATA_TYPES:-"shardStats,wikipedia,mycsv,myjson"}

DW_DATAWAVE_MAPRED_INGEST_OPTS=${DW_DATAWAVE_MAPRED_INGEST_OPTS:-"-useInlineCombiner -ingestMetricsDisabled"}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ tar xf "${DW_DATAWAVE_SERVICE_DIR}/${DW_DATAWAVE_INGEST_DIST}" -C "${TARBALL_BAS

info "DataWave Ingest tarball extracted and symlinked"

source "${THIS_DIR}/fix-hadoop-classpath.sh"

if ! hadoopIsRunning ; then
info "Starting Hadoop, so that we can initialize Accumulo"
hadoopStart
fi

# Create any Hadoop directories related to Datawave Ingest
# Create any Hadoop directories needed for live ingest input
if [[ -n "${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES}" ]] ; then

OLD_IFS="${IFS}"
Expand All @@ -44,10 +46,25 @@ if [[ -n "${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES}" ]] ; then
IFS="${OLD_IFS}"

for dir in "${HDFS_RAW_INPUT_DIRS[@]}" ; do
# Dirs created here should be configured in your live flag maker config (e.g., in config/flag-maker-live.xml)
hdfs dfs -mkdir -p "${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/${dir}" || fatal "Failed to create HDFS directory: ${dir}"
done
fi

# Create any Hadoop directories needed for bulk ingest input
if [[ -n "${DW_DATAWAVE_INGEST_BULK_DATA_TYPES}" ]] ; then

OLD_IFS="${IFS}"
IFS=","
HDFS_RAW_INPUT_DIRS=( ${DW_DATAWAVE_INGEST_BULK_DATA_TYPES} )
IFS="${OLD_IFS}"

for dir in "${HDFS_RAW_INPUT_DIRS[@]}" ; do
# Dirs created here should be configured in your bulk flag maker config (e.g., in config/flag-maker-bulk.xml)
hdfs dfs -mkdir -p "${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/${dir}-bulk" || fatal "Failed to create HDFS directory: ${dir}-bulk"
done
fi

#----------------------------------------------------------
# Configure/update Accumulo classpath, set auths, etc
#----------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion properties/dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ LIVE_CHILD_MAP_MAX_MEMORY_MB=1024
BULK_CHILD_REDUCE_MAX_MEMORY_MB=2048
LIVE_CHILD_REDUCE_MAX_MEMORY_MB=1024

BULK_INGEST_DATA_TYPES=shardStats
BULK_INGEST_DATA_TYPES=shardStats,wikipedia,mycsv,myjson
LIVE_INGEST_DATA_TYPES=wikipedia,mycsv,myjson
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are moving those types to bulk, then remove from live

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, my goal here was just to signal to the user that they can now use either live or bulk for the 3 test data types...although, I guess it really doesn't matter in the end, since both live and bulk types here get dumped into the ingest.data.types config list, and that list gets deduped in TypeRegistry

Makes me wonder, do we really need to maintain separate variables for these? So far, I haven't found a case where the distinction matters to our code

Anyway, the live flag maker still polls the datatypeName dirs in hdfs, same as always. And the bulk flag maker is now configured to run, polling the new datatypeName-bulk dirs (created in quickstart's install-ingest.sh above)


# Clear out these values if you do not want standard shard ingest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,24 @@
<name>partitioner.default.delegate</name>
<value>datawave.ingest.mapreduce.partition.MultiTableRRRangePartitioner</value>
</property>

<property>
<name>datawave.ingest.splits.cache.dir</name>
<value>${WAREHOUSE_HDFS_NAME_NODE}/data/splitsCache</value>
</property>

<property>
<name>accumulo.config.cache.path</name>
<value>${WAREHOUSE_HDFS_NAME_NODE}/data/accumuloConfigCache/accConfCache.txt</value>
</property>

<property>
<name>ingest.bulk.import.mode</name>
<value>V2_LOAD_PLANNING</value>
<description>
Must be one of [V1, V2_LOCAL_MAPPING, V2_LOAD_PLANNING]
(See BulkIngestMapFileLoader.ImportMode)
</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">

<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{ISO8601} %p [%c{1.}] [%t-%tid] %m%n"/>
</layout>
</appender>

<logger name="org.apache.hadoop">
<level value="info"/>
</logger>

<root>
<priority value="debug"/>
<appender-ref ref="CONSOLE"/>
</root>
</log4j:configuration>
5 changes: 5 additions & 0 deletions warehouse/ingest-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public Path createTempFile(FileSystem fs) throws IOException {
do {
Path parentDirectory = this.cacheFilePath.getParent();
String fileName = this.cacheFilePath.getName() + "." + count;
log.info("Attempting to create " + fileName + "under " + parentDirectory);
log.info("Attempting to create " + fileName + " under " + parentDirectory);
tmpCacheFile = new Path(parentDirectory, fileName);
count++;
} while (!fs.createNewFile(tmpCacheFile));
Expand Down
Loading
Loading