Skip to content

Commit

Permalink
HadoopReader (was InputFormatReader)
Browse files Browse the repository at this point in the history
  • Loading branch information
KIRSTEN W. HILDRUM committed Sep 27, 2015
1 parent e6853e0 commit 7f8541f
Show file tree
Hide file tree
Showing 14 changed files with 714 additions and 7 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
com.ibm.streamsx.hdfs/opt/downloaded
com.ibm.streamsx.hdfs/opt/hadoopreader
output
toolkit.xml
data
com.ibm.streamsx.hdfs/com.ibm.streamsx.hdfs/HadoopReader
src-gen
com.ibm.streamsx.hdfs/impl/lib
.toolkitList
24 changes: 21 additions & 3 deletions com.ibm.streamsx.hdfs/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
<property name="lib.dir" value="lib" />
<property name="test.run.dir" value="tests" />
<property name="test.build.dir" value="${test.run.dir}/bin" />
<property name="ext.downloads.dir" value="opt/downloaded" />
<property name="ext.downloads.dir" value="opt/downloaded" />
<property name="ext.hadoopreader.dir" value="opt/hadoopreader" />
<property name="gensrc.dir" value="${basedir}/impl/java/src-gen" />
<property name="jarfile" value="BigData.jar" />
<property name="spl-mt" value="${streams.install}/bin/spl-make-toolkit" />
Expand Down Expand Up @@ -50,7 +51,8 @@

<path id="cp.ext.libs">
<fileset dir="${ext.downloads.dir}"/>
</path>
<fileset dir="${ext.hadoopreader.dir}"/>
</path>


<path id="cp.compile">
Expand Down Expand Up @@ -80,7 +82,23 @@
<exec executable="${maven.bin}" failonerror="true">
<arg value="dependency:copy-dependencies"/>
<arg value="-DoutputDirectory=${ext.downloads.dir}"/>
</exec>
</exec>
<copy todir="${ext.hadoopreader.dir}">
<fileset dir="${ext.downloads.dir}">
<include name="avro*.jar"/>
<include name="commons-cli*.jar"/>
<include name="commons-configuration*.jar"/>
<include name="commons-logging*.jar"/>
<include name="hadoop-mapreduce-client-core*.jar"/>
<include name="hadoop-common*.jar"/>
<include name="hadoop-auth*.jar"/>
<include name="hadoop-hdfs*.jar"/>
<include name="commons-collections*.jar"/>
<include name="commons-compress*.jar"/>
<include name="htrace-core*.jar"/>
<include name="servlet-*.jar"/>
</fileset>
</copy>
</target>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
/* Copyright (C) 2015, International Business Machines Corporation */
/* All Rights Reserved */

package com.ibm.streamsx.hdfs;


import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.log4j.Logger;

import com.ibm.streams.operator.AbstractOperator;
import com.ibm.streams.operator.Attribute;
import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.OperatorContext.ContextCheck;
import com.ibm.streams.operator.OutputTuple;
import com.ibm.streams.operator.StreamSchema;
import com.ibm.streams.operator.StreamingData.Punctuation;
import com.ibm.streams.operator.StreamingOutput;
import com.ibm.streams.operator.Type.MetaType;
import com.ibm.streams.operator.compile.OperatorContextChecker;
import com.ibm.streams.operator.model.Libraries;
import com.ibm.streams.operator.model.OutputPortSet;
import com.ibm.streams.operator.model.OutputPortSet.WindowPunctuationOutputMode;
import com.ibm.streams.operator.model.OutputPorts;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import java.io.File;

/**
* Operator to read input formats.
*/
@Libraries({"opt/hadoopreader/*"})
@PrimitiveOperator(name="HadoopReader", namespace="com.ibm.streamsx.hdfs",
description="HadoopReader can read text files, compressed text files, and sequence files. If in a parallel region, each channel will handle a separate part of the input, unless unsplittable compression is used. It is based on Hadoop's InputFormat, and so each output tuple is a key-value pair.")
@OutputPorts({@OutputPortSet(description="The operator will populate the key and value attribute on this port with the key-value pairs given by the record reader for the underlying file format. Either the key or the value can be skipped.", cardinality=1, optional=false, windowPunctuationOutputMode=WindowPunctuationOutputMode.Generating), @OutputPortSet(description="Optional output ports", optional=true, windowPunctuationOutputMode=WindowPunctuationOutputMode.Generating)})
public class HadoopReader extends AbstractOperator {

public static final String consistentCutIntroducer="\\n\\n**Behavior in a consistent region**\\n\\n";

public static final String description = "The operator uses the InputFormat interface to read data from HDFS. Use the fileType parameter to set the input format type. Two are currently supported--text and sequence. The default is text. "+
"The text format supports both compressed and uncompressed text."+
" The operator has one output port It has two optional attributes, key and value. These correspond to the key and value returned by the underlying RecordReader."+
"**Configurating the operator**"+
"To configure the operator, set the configResource parameter to your core-site.xml file."+
consistentCutIntroducer+
"The operator has no consistent region support.";
private static final String INPUT_PARAM_NAME = "file";
private static final String FILETYPE_PARAM_NAME="fileType";
Logger logger = Logger.getLogger(this.getClass());

public static enum FileType {
text,
sequence
}

private FileType fileType = FileType.text;

@Parameter(name=FILETYPE_PARAM_NAME,description="Type of file. Use text for uncompressed and uncompressed text files and sequence for sequence files",optional=true)
public void setFileType(FileType type) {
fileType = type;
}




private static final String CONFIG_RESOURCES_PARAM_NAME = "configResources";


List<InputSplit> splits;
/**
* Thread for calling <code>produceTuples()</code> to produce tuples
*/
private Thread processThread;



private String inputFiles[];
private String configResources[];
FileInputFormat<LongWritable,Text> inputFormat;
int keyIndex = -1;
int valueIndex = -1;

int channel = -1;
int maxChannels = -1;
Configuration conf;


@Parameter(name=INPUT_PARAM_NAME,description="Paths to be used as input")
public void setFile(String[] files ) {
inputFiles = files;
}

@Parameter(name=CONFIG_RESOURCES_PARAM_NAME,optional=true,description="Resources to be added to the configuration")
public void setResources(String[] paths) {
configResources = paths;
}

@ContextCheck(compile=true)
public static void compileChecks(OperatorContextChecker checker) {
StreamSchema outSchema= checker.getOperatorContext().getStreamingOutputs().get(0).getStreamSchema();

Attribute keyAttr = outSchema.getAttribute("key");
Attribute valueAttr = outSchema.getAttribute("value");


if (keyAttr == null && valueAttr == null) {
checker.setInvalidContext("Either key or value must be on output stream ", new Object[0]);
}

if (checker.getOperatorContext().getNumberOfStreamingOutputs() != 1) {
checker.setInvalidContext("Number of streaming outputs must be 1",new Object[0]);
}
}


@ContextCheck(compile=false)
public static void runtimeCheck(OperatorContextChecker checker) {
StreamSchema outSchema= checker.getOperatorContext().getStreamingOutputs().get(0).getStreamSchema();

Attribute keyAttr = outSchema.getAttribute("key");
Attribute valueAttr = outSchema.getAttribute("value");


if (keyAttr != null && keyAttr.getType().getMetaType() != MetaType.INT64) {
checker.setInvalidContext("Type of key on output stream must be int64", new Object[0]);
}

if (valueAttr != null && valueAttr.getType().getMetaType() != MetaType.RSTRING
&& valueAttr.getType().getMetaType() != MetaType.USTRING) {
checker.setInvalidContext("Type of value on output stream must be RString or ustring", new Object[0]);
}

}

/**
* Initialize this operator. Called once before any tuples are processed.
* @param context OperatorContext for this operator.
* @throws Exception Operator failure, will cause the enclosing PE to terminate.
*/
@Override
public synchronized void initialize(OperatorContext context)
throws Exception {
// Must call super.initialize(context) to correctly setup an operator.
super.initialize(context);
Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() );


StreamingOutput<OutputTuple> out = context.getStreamingOutputs().get(0);
StreamSchema outSchema = out.getStreamSchema();

channel = context.getChannel();
maxChannels = context.getMaxChannels();

// This will make the for loop in process tuples work right--we take splits
// channel + k* maxChannels.
if (channel < 0) {
channel = 0;
maxChannels = 1;
}

if (outSchema.getAttribute("key") != null) {
keyIndex = outSchema.getAttributeIndex("key");
}
if (outSchema.getAttribute("value") != null) {
valueIndex = outSchema.getAttributeIndex("value");
}

// Establish input splits.
try {

//Class<FileInputFormat<K, V>> inputFormatClass= (Class<FileInputFormat<K, V>>) Class.forName(inputFormatClassname);
//FileInputFormat<K,V> inputFormat = inputFormatClass.newInstance();

if (fileType == FileType.text) {
inputFormat = new TextInputFormat();
}
else {
inputFormat = new SequenceFileInputFormat();
}

conf = new Configuration();
if (configResources != null ) {
for (String s : configResources) {
File toAdd = new File(s);
String pathToFile;
if (toAdd.isAbsolute()) {
pathToFile = toAdd.getAbsolutePath();
}
else {
pathToFile = context.getPE().getApplicationDirectory()+File.separator+s;
toAdd = new File(pathToFile);
}
if (!toAdd.exists()) {
throw new Exception("Specified configuration file "+s+" not found at "+pathToFile);
}
logger.info("Adding "+pathToFile+" as config resource");
conf.addResource(new Path(pathToFile));
}
String defaultFS = conf.get("fs.defaultFS");
if (!defaultFS.startsWith("hdfs")) {
logger.warn("Default file system not HDFS; may be configuration problem");
}
logger.debug("Default file system is "+defaultFS);
}
Job job = Job.getInstance(conf);

for (String p : inputFiles) {
FileInputFormat.addInputPath(job, new Path(p));
}

splits = inputFormat.getSplits(job);
logger.info("There are "+splits.size()+" splits");
}
catch (IOException e ) {
throw e;
}


processThread = getOperatorContext().getThreadFactory().newThread(
new Runnable() {

@Override
public void run() {
try {
produceTuples();
} catch (Exception e) {
Logger.getLogger(this.getClass()).error("Operator error", e);
}
}

});

/*
* Set the thread not to be a daemon to ensure that the SPL runtime
* will wait for the thread to complete before determining the
* operator is complete.
*/
processThread.setDaemon(false);
}

/**
* Notification that initialization is complete and all input and output ports
* are connected and ready to receive and submit tuples.
* @throws Exception Operator failure, will cause the enclosing PE to terminate.
*/
@Override
public synchronized void allPortsReady() throws Exception {
OperatorContext context = getOperatorContext();
Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " all ports are ready in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() );
// Start a thread for producing tuples because operator
// implementations must not block and must return control to the caller.
processThread.start();
}

/**
* Submit new tuples to the output stream
* @throws Exception if an error occurs while submitting a tuple
*/
private void produceTuples() throws Exception {
final StreamingOutput<OutputTuple> out = getOutput(0);

for (int i = channel; i < splits.size(); i = i + maxChannels) {
if (logger.isInfoEnabled()) {
logger.info("Handling split "+i);
}
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID("channel "+channel+" of "+maxChannels,0,TaskType.MAP,i,0));

RecordReader<LongWritable,Text> reader = inputFormat.createRecordReader(splits.get(i),context);
reader.initialize(splits.get(i), context);

while (reader.nextKeyValue()) {
OutputTuple toSend = out.newTuple();
// TODO set filename, if it makes sense.
if (keyIndex >= 0) {
toSend.setLong(keyIndex, reader.getCurrentKey().get());
}
if (valueIndex >= 0) {
toSend.setString(valueIndex, reader.getCurrentValue().toString());
}
out.submit(toSend);
}
out.punctuate(Punctuation.WINDOW_MARKER);
}
out.punctuate(Punctuation.FINAL_MARKER);
}

/**
* Shutdown this operator, which will interrupt the thread
* executing the <code>produceTuples()</code> method.
* @throws Exception Operator failure, will cause the enclosing PE to terminate.
*/
public synchronized void shutdown() throws Exception {
if (processThread != null) {
processThread.interrupt();
processThread = null;
}
OperatorContext context = getOperatorContext();
Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " shutting down in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() );

// TODO: If needed, close connections or release resources related to any external system or data store.

// Must call super.shutdown()
super.shutdown();
}
}
4 changes: 2 additions & 2 deletions com.ibm.streamsx.hdfs/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ Alternatively, you can fully qualify the operators that are provided by toolkit
9. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio.

</description>
<version>2.0.0</version>
<version>2.1.0</version>
<requiredProductVersion>4.0.0.0</requiredProductVersion>
</identity>
<dependencies/>
</toolkitInfoModel>
</toolkitInfoModel>
4 changes: 2 additions & 2 deletions com.ibm.streamsx.hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.1.2-SNAPSHOT</version>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
Expand Down
Loading

0 comments on commit 7f8541f

Please sign in to comment.