diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0f36671 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +toolkit.xml +*StreamsModel.java +/com.ibm.streamsx.hdfs/com.ibm.streamsx.hdfs/InputFormatReader +tests/*/*/output diff --git a/com.ibm.streamsx.hdfs/.classpath b/com.ibm.streamsx.hdfs/.classpath index 06951b2..2ceb108 100644 --- a/com.ibm.streamsx.hdfs/.classpath +++ b/com.ibm.streamsx.hdfs/.classpath @@ -8,5 +8,6 @@ + diff --git a/com.ibm.streamsx.hdfs/build.xml b/com.ibm.streamsx.hdfs/build.xml index 513b60c..9254f18 100644 --- a/com.ibm.streamsx.hdfs/build.xml +++ b/com.ibm.streamsx.hdfs/build.xml @@ -51,11 +51,15 @@ + + + + diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/InputFormatReader.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/InputFormatReader.java new file mode 100644 index 0000000..c5ac453 --- /dev/null +++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/InputFormatReader.java @@ -0,0 +1,320 @@ +/* Copyright (C) 2015, International Business Machines Corporation */ +/* All Rights Reserved */ + +package com.ibm.streamsx.hdfs; + + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.log4j.Logger; + +import com.ibm.streams.operator.AbstractOperator; +import com.ibm.streams.operator.Attribute; +import com.ibm.streams.operator.OperatorContext; +import com.ibm.streams.operator.OperatorContext.ContextCheck; +import com.ibm.streams.operator.OutputTuple; +import com.ibm.streams.operator.StreamSchema; +import com.ibm.streams.operator.StreamingData.Punctuation; +import com.ibm.streams.operator.StreamingOutput; +import com.ibm.streams.operator.Type.MetaType; +import com.ibm.streams.operator.compile.OperatorContextChecker; +import com.ibm.streams.operator.model.Libraries; +import com.ibm.streams.operator.model.OutputPortSet; +import com.ibm.streams.operator.model.OutputPortSet.WindowPunctuationOutputMode; +import com.ibm.streams.operator.model.OutputPorts; +import com.ibm.streams.operator.model.Parameter; +import com.ibm.streams.operator.model.PrimitiveOperator; +import java.io.File; + +/** + * Operator to read input formats. + */ +@Libraries({"opt/downloaded/*","opt/inputformatreader/*"}) +@PrimitiveOperator(name="InputFormatReader", namespace="com.ibm.streamsx.hdfs", +description="Java Operator InputFormatReader") +@OutputPorts({@OutputPortSet(description="The operator will populate the key and value attribute on this port with the key-value pairs given by the record reader.", cardinality=1, optional=false, windowPunctuationOutputMode=WindowPunctuationOutputMode.Generating), @OutputPortSet(description="Optional output ports", optional=true, windowPunctuationOutputMode=WindowPunctuationOutputMode.Generating)}) +public class InputFormatReader extends AbstractOperator { + + public static final String consistentCutIntroducer="\\n\\n**Behavior in a consistent region**\\n\\n"; + + public static final String description = "The operator uses the InputFormat interface to read data from HDFS. Use the fileType parameter to set the input format type. Two are currently supported--text and sequence. The default is text. "+ + "The text format supports both compressed and uncompressed text."+ + " The operator has one output port It has two optional attributes, key and value. These correspond to the key and value returned by the underlying RecordReader."+ + "**Configurating the operator**"+ + "To configure the operator, set the configResource parameter to your core-site.xml file."+ + consistentCutIntroducer+ + "The operator has no consistent region support."; + private static final String INPUT_PARAM_NAME = "file"; + private static final String FILETYPE_PARAM_NAME="fileType"; + Logger logger = Logger.getLogger(this.getClass()); + + public static enum FileType { + text, + sequence + } + + private FileType fileType = FileType.text; + + @Parameter(name=FILETYPE_PARAM_NAME,description="Type of file. Use text for uncompressed and uncompressed text files and sequence for sequence files",optional=true) + public void setFileType(FileType type) { + fileType = type; + } + + + + + private static final String CONFIG_RESOURCES_PARAM_NAME = "configResources"; + + + List splits; + /** + * Thread for calling produceTuples() to produce tuples + */ + private Thread processThread; + + + + private String inputFiles[]; + private String configResources[]; + FileInputFormat inputFormat; + int keyIndex = -1; + int valueIndex = -1; + + int channel = -1; + int maxChannels = -1; + Configuration conf; + + + @Parameter(name=INPUT_PARAM_NAME,description="Paths to be used as input") + public void setFile(String[] files ) { + inputFiles = files; + } + + @Parameter(name=CONFIG_RESOURCES_PARAM_NAME,optional=true,description="Resources to be added to the configuration") + public void setResources(String[] paths) { + configResources = paths; + } + + @ContextCheck(compile=true) + public static void compileChecks(OperatorContextChecker checker) { + StreamSchema outSchema= checker.getOperatorContext().getStreamingOutputs().get(0).getStreamSchema(); + + Attribute keyAttr = outSchema.getAttribute("key"); + Attribute valueAttr = outSchema.getAttribute("value"); + + + if (keyAttr == null && valueAttr == null) { + checker.setInvalidContext("Either key or value must be on output stream ", new Object[0]); + } + + if (checker.getOperatorContext().getNumberOfStreamingOutputs() != 1) { + checker.setInvalidContext("Number of streaming outputs must be 1",new Object[0]); + } + } + + + @ContextCheck(compile=false) + public static void runtimeCheck(OperatorContextChecker checker) { + StreamSchema outSchema= checker.getOperatorContext().getStreamingOutputs().get(0).getStreamSchema(); + + Attribute keyAttr = outSchema.getAttribute("key"); + Attribute valueAttr = outSchema.getAttribute("value"); + + + if (keyAttr != null && keyAttr.getType().getMetaType() != MetaType.INT64) { + checker.setInvalidContext("Type of key on output stream must be int64", new Object[0]); + } + + if (valueAttr != null && valueAttr.getType().getMetaType() != MetaType.RSTRING + && valueAttr.getType().getMetaType() != MetaType.USTRING) { + checker.setInvalidContext("Type of value on output stream must be RString or ustring", new Object[0]); + } + + } + + /** + * Initialize this operator. Called once before any tuples are processed. + * @param context OperatorContext for this operator. + * @throws Exception Operator failure, will cause the enclosing PE to terminate. + */ + @Override + public synchronized void initialize(OperatorContext context) + throws Exception { + // Must call super.initialize(context) to correctly setup an operator. + super.initialize(context); + Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); + + + StreamingOutput out = context.getStreamingOutputs().get(0); + StreamSchema outSchema = out.getStreamSchema(); + + channel = context.getChannel(); + maxChannels = context.getMaxChannels(); + + // This will make the for loop in process tuples work right--we take splits + // channel + k* maxChannels. + if (channel < 0) { + channel = 0; + maxChannels = 1; + } + + if (outSchema.getAttribute("key") != null) { + keyIndex = outSchema.getAttributeIndex("key"); + } + if (outSchema.getAttribute("value") != null) { + valueIndex = outSchema.getAttributeIndex("value"); + } + + // Establish input splits. + try { + + //Class> inputFormatClass= (Class>) Class.forName(inputFormatClassname); + //FileInputFormat inputFormat = inputFormatClass.newInstance(); + + if (fileType == FileType.text) { + inputFormat = new TextInputFormat(); + } + else { + inputFormat = new SequenceFileInputFormat(); + } + + conf = new Configuration(); + if (configResources != null ) { + for (String s : configResources) { + File toAdd = new File(s); + String pathToFile; + if (toAdd.isAbsolute()) { + pathToFile = toAdd.getAbsolutePath(); + } + else { + pathToFile = context.getPE().getApplicationDirectory()+File.separator+s; + toAdd = new File(pathToFile); + } + if (!toAdd.exists()) { + throw new Exception("Specified configuration file "+s+" not found at "+pathToFile); + } + logger.info("Adding "+pathToFile+" as config resource"); + conf.addResource(new Path(pathToFile)); + } + String defaultFS = conf.get("fs.defaultFS"); + if (!defaultFS.startsWith("hdfs")) { + logger.warn("Default file system not HDFS; may be configuration problem"); + } + logger.debug("Default file system is "+defaultFS); + } + Job job = Job.getInstance(conf); + + for (String p : inputFiles) { + FileInputFormat.addInputPath(job, new Path(p)); + } + + splits = inputFormat.getSplits(job); + logger.info("There are "+splits.size()+" splits"); + } + catch (IOException e ) { + throw e; + } + + + processThread = getOperatorContext().getThreadFactory().newThread( + new Runnable() { + + @Override + public void run() { + try { + produceTuples(); + } catch (Exception e) { + Logger.getLogger(this.getClass()).error("Operator error", e); + } + } + + }); + + /* + * Set the thread not to be a daemon to ensure that the SPL runtime + * will wait for the thread to complete before determining the + * operator is complete. + */ + processThread.setDaemon(false); + } + + /** + * Notification that initialization is complete and all input and output ports + * are connected and ready to receive and submit tuples. + * @throws Exception Operator failure, will cause the enclosing PE to terminate. + */ + @Override + public synchronized void allPortsReady() throws Exception { + OperatorContext context = getOperatorContext(); + Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " all ports are ready in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); + // Start a thread for producing tuples because operator + // implementations must not block and must return control to the caller. + processThread.start(); + } + + /** + * Submit new tuples to the output stream + * @throws Exception if an error occurs while submitting a tuple + */ + private void produceTuples() throws Exception { + final StreamingOutput out = getOutput(0); + + for (int i = channel; i < splits.size(); i = i + maxChannels) { + if (logger.isInfoEnabled()) { + logger.info("Handling split "+i); + } + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID("channel "+channel+" of "+maxChannels,0,TaskType.MAP,i,0)); + + RecordReader reader = inputFormat.createRecordReader(splits.get(i),context); + reader.initialize(splits.get(i), context); + + while (reader.nextKeyValue()) { + OutputTuple toSend = out.newTuple(); + // TODO set filename, if it makes sense. + if (keyIndex >= 0) { + toSend.setLong(keyIndex, reader.getCurrentKey().get()); + } + if (valueIndex >= 0) { + toSend.setString(valueIndex, reader.getCurrentValue().toString()); + } + out.submit(toSend); + } + out.punctuate(Punctuation.WINDOW_MARKER); + } + out.punctuate(Punctuation.FINAL_MARKER); + } + + /** + * Shutdown this operator, which will interrupt the thread + * executing the produceTuples() method. + * @throws Exception Operator failure, will cause the enclosing PE to terminate. + */ + public synchronized void shutdown() throws Exception { + if (processThread != null) { + processThread.interrupt(); + processThread = null; + } + OperatorContext context = getOperatorContext(); + Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " shutting down in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); + + // TODO: If needed, close connections or release resources related to any external system or data store. + + // Must call super.shutdown() + super.shutdown(); + } +} diff --git a/com.ibm.streamsx.hdfs/info.xml b/com.ibm.streamsx.hdfs/info.xml index 54d56f4..a5fa74d 100644 --- a/com.ibm.streamsx.hdfs/info.xml +++ b/com.ibm.streamsx.hdfs/info.xml @@ -156,7 +156,7 @@ Alternatively, you can fully qualify the operators that are provided by toolkit 9. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio. - 2.0.0 + 2.1.0 4.0.0.0 diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/.gitignore b/com.ibm.streamsx.hdfs/opt/inputformatreader/.gitignore new file mode 100644 index 0000000..65f3cd7 --- /dev/null +++ b/com.ibm.streamsx.hdfs/opt/inputformatreader/.gitignore @@ -0,0 +1 @@ +ibm-compression.jar diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/avro-1.7.4.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/avro-1.7.4.jar new file mode 100644 index 0000000..afcfc70 Binary files /dev/null and b/com.ibm.streamsx.hdfs/opt/inputformatreader/avro-1.7.4.jar differ diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-cli-1.2.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-cli-1.2.jar new file mode 100644 index 0000000..ce4b9ff Binary files /dev/null and b/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-cli-1.2.jar differ diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-configuration-1.6.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-configuration-1.6.jar new file mode 100644 index 0000000..2d4689a Binary files /dev/null and b/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-configuration-1.6.jar differ diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-logging-1.1.1.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-logging-1.1.1.jar new file mode 100644 index 0000000..1deef14 Binary files /dev/null and b/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-logging-1.1.1.jar differ diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/hadoop-core.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/hadoop-core.jar new file mode 100644 index 0000000..47a5383 Binary files /dev/null and b/com.ibm.streamsx.hdfs/opt/inputformatreader/hadoop-core.jar differ diff --git a/demos/WordCount/.project b/demos/WordCount/.project new file mode 100644 index 0000000..03df5a9 --- /dev/null +++ b/demos/WordCount/.project @@ -0,0 +1,34 @@ + + + WordCount + + + + + + org.eclipse.ui.externaltools.ExternalToolBuilder + full,incremental, + + + LaunchConfigHandle + <project>/.externalToolBuilders/org.eclipse.jdt.core.javabuilder.launch + + + + + com.ibm.streams.studio.splproject.builder.SPLProjectBuilder + + + + + org.eclipse.xtext.ui.shared.xtextBuilder + + + + + + com.ibm.streams.studio.splproject.SPLProjectNature + org.eclipse.xtext.ui.shared.xtextNature + org.eclipse.jdt.core.javanature + + diff --git a/demos/WordCount/ReadAndTokenize.spl b/demos/WordCount/ReadAndTokenize.spl new file mode 100644 index 0000000..11140d2 --- /dev/null +++ b/demos/WordCount/ReadAndTokenize.spl @@ -0,0 +1,46 @@ +use com.ibm.streamsx.hdfs::InputFormatReader ; +type EmailTuple = rstring ID, rstring From, rstring Date, rstring Subject, + rstring ToList, rstring CcList, rstring BccList, rstring Body ; + +composite ReadAndTokenize(output WordStream ) +{ + param + expression $inputFile ; + expression $configFile : "etc/core-site.xml" ; + graph + stream TuplesAsString = InputFormatReader() + { + param + file : $inputFile ; + configResources : $configFile ; + } + + stream Body = Functor(TuplesAsString) + { + output + Body : + // body = "This is a test"; + body =((EmailTuple) value).Body ; + } + + stream WordStream = Custom(Body) + { + logic + onTuple Body : + { + list words = tokenize(body, ",\r\n \t", false) ; + for(rstring w in words) + { + submit({ word = w }, WordStream) ; + } + + } + + onPunct Body : + { + submit(currentPunct(), WordStream) ; + } + + } + +} diff --git a/demos/WordCount/ReadTokenizeCombine.spl b/demos/WordCount/ReadTokenizeCombine.spl new file mode 100644 index 0000000..409f0f4 --- /dev/null +++ b/demos/WordCount/ReadTokenizeCombine.spl @@ -0,0 +1,56 @@ +use com.ibm.streamsx.hdfs::InputFormatReader ; + +composite ReadTokenizeCombine(output WordCounts ) +{ + param + expression $inputFile ; + expression $configFile : "etc/core-site.xml" ; + graph + stream TuplesAsString = InputFormatReader() + { + param + file : $inputFile ; + configResources : $configFile ; + } + + stream Body = Functor(TuplesAsString) + { + output + Body : + // body = "This is a test"; + body =((EmailTuple) value).Body ; + } + + stream WordStream = Custom(Body) + { + logic + onTuple Body : + { + list words = tokenize(body, ",\r\n \t", false) ; + mutable tuple toSubmit = (tuple){}; + for(rstring w in words) + { + toSubmit.word = w; + submit(toSubmit, WordStream) ; + } + + } + + onPunct Body : + { + submit(currentPunct(), WordStream) ; + } + + } + + stream WordCounts = Aggregate(WordStream) + { + window + WordStream : tumbling, punct(), partitioned ; + param + partitionBy : word ; + output + WordCounts : count = Count() ; + } + +} diff --git a/demos/WordCount/WordCount.spl b/demos/WordCount/WordCount.spl new file mode 100644 index 0000000..7c2d208 --- /dev/null +++ b/demos/WordCount/WordCount.spl @@ -0,0 +1,66 @@ + + +composite Reduce(input InStream ; output WordCount) +{ + graph + stream Puncted = Custom(InStream) + { + logic + onTuple InStream : submit(InStream, Puncted) ; + onPunct InStream : + { + if(currentPunct() == Sys.FinalMarker) + { + submit(Sys.WindowMarker, Puncted) ; + submit(Sys.FinalMarker, Puncted) ; + } + + } + + } + + stream WordCount = Aggregate(Puncted as I) + { + window + I : tumbling, punct(), partitioned ; + param + partitionBy : word ; + output + WordCount : count = Count() ; + } + + () as sink = FileSink(WordCount) + { + param + file : "words" +(rstring) getChannel() + ".txt" ; + } + +} + +composite WordCount +{ + param + expression $numMappers :(int32) getSubmissionTimeValue("numMappers", + "1") ; + expression $numReducers :(int32) getSubmissionTimeValue("numReducers", + "1") ; + graph + @parallel(width = $numMappers) + stream WordStream = ReadAndTokenize() + { + param + inputFile : getSubmissionTimeValue("inputFile", "allEnron.txt") ; + config + placement : partitionColocation("map") ; + } + + @parallel(width = $numReducers, partitionBy = [ { port = WordStream, + attributes = [ word ] } ]) + stream WordCount = Reduce(WordStream) + { + config + placement : partitionColocation("reduce") ; + } + +} + diff --git a/demos/WordCount/WordCountCombine.spl b/demos/WordCount/WordCountCombine.spl new file mode 100644 index 0000000..49fdcd9 --- /dev/null +++ b/demos/WordCount/WordCountCombine.spl @@ -0,0 +1,65 @@ + + +composite ReduceFromCombine(input InStream ; output WordCount) +{ + graph + stream Puncted = Custom(InStream) + { + logic + onTuple InStream : submit(InStream, Puncted) ; + onPunct InStream : + { + if(currentPunct() == Sys.FinalMarker) + { + submit(Sys.WindowMarker, Puncted) ; + submit(Sys.FinalMarker, Puncted) ; + } + } + + } + + stream WordCount = Aggregate(Puncted as I) + { + window + I : tumbling, punct(), partitioned ; + param + partitionBy : word ; + output + WordCount : count = Sum(count) ; + } + + () as sink = FileSink(WordCount) + { + param + file : "combine_words" +(rstring) getChannel() + ".txt" ; + } + +} + +composite WordCountCombine +{ + param + expression $numMappers :(int32) getSubmissionTimeValue("numMappers", + "1") ; + expression $numReducers :(int32) getSubmissionTimeValue("numReducers", + "1") ; + graph + @parallel(width = $numMappers) + stream WordStream = ReadTokenizeCombine() + { + param + inputFile : getSubmissionTimeValue("inputFile", "allEnron.txt") ; + config + placement : partitionColocation("map") ; + } + + @parallel(width = $numReducers, partitionBy = [ { port = WordStream, + attributes = [ word ] } ]) + stream WordCount = ReduceFromCombine(WordStream) + { + config + placement : partitionColocation("reduce") ; + } + +} + diff --git a/demos/WordCount/info.xml b/demos/WordCount/info.xml new file mode 100644 index 0000000..8c30255 --- /dev/null +++ b/demos/WordCount/info.xml @@ -0,0 +1,21 @@ + + + + + WordCount + Show how to implement WordCount and WordCountWithCombine + 1.0.0 + 4.0 + + + + com.ibm.streamsx.hdfs + 2.1 + + + diff --git a/tests/InputFormatReader/ReadSequenceParallel/Main.spl b/tests/InputFormatReader/ReadSequenceParallel/Main.spl new file mode 100644 index 0000000..946e375 --- /dev/null +++ b/tests/InputFormatReader/ReadSequenceParallel/Main.spl @@ -0,0 +1,41 @@ +/* Copyright (C) 2015, International Business Machines Corporation */ +/* All Rights Reserved */ +use com.ibm.streamsx.hdfs::InputFormatReader; +use hdfs.test.common::CheckCount; + +/** + * Use to check input file reading for both compressed and uncompressed files. + * Also checks that key-value output, key only, and value only output works. + */ + +composite ReadAndCount { + +param expression $inputFile: getSubmissionTimeValue("inputFile"); +expression $configFile: getSubmissionTimeValue("configFile"); +expression> $recordCountArray: (list)getSubmissionTimeValue("recordCountArray"); + +graph + +stream Both = InputFormatReader() { +param +file: $inputFile; +configResources: $configFile; +fileType:sequence; +} + +() as bothCheck = CheckCount(Both) { +param +tupleCount: $recordCountArray[getChannel()]; +} +} + +composite Main { + +graph +@parallel(width=2) +() as channel = ReadAndCount() { + +} + +} + diff --git a/tests/InputFormatReader/ReadSequenceSingle/Main.spl b/tests/InputFormatReader/ReadSequenceSingle/Main.spl new file mode 100644 index 0000000..aecdad5 --- /dev/null +++ b/tests/InputFormatReader/ReadSequenceSingle/Main.spl @@ -0,0 +1,33 @@ +/* Copyright (C) 2015, International Business Machines Corporation */ +/* All Rights Reserved */ + +use com.ibm.streamsx.hdfs::InputFormatReader; +use hdfs.test.common::CheckCount; + +/** + * Use to check input file reading for both compressed and uncompressed files. + * Also checks that key-value output, key only, and value only output works. + */ + +composite Main { + +param expression $inputFile: getSubmissionTimeValue("inputFile"); +expression $configFile: getSubmissionTimeValue("configFile"); +expression $recordCount: (int32)getSubmissionTimeValue("recordCount"); + +graph + +stream Both = InputFormatReader() { +param +file: $inputFile; +configResources: $configFile; +fileType:sequence; +} + +() as bothCheck = CheckCount(Both) { +param +tupleCount: $recordCount; +} + + +} diff --git a/tests/InputFormatReader/ReadTextParallel/Main.spl b/tests/InputFormatReader/ReadTextParallel/Main.spl new file mode 100644 index 0000000..fce3e33 --- /dev/null +++ b/tests/InputFormatReader/ReadTextParallel/Main.spl @@ -0,0 +1,41 @@ +/* Copyright (C) 2015, International Business Machines Corporation */ +/* All Rights Reserved */ + +use com.ibm.streamsx.hdfs::InputFormatReader; +use hdfs.test.common::CheckCount; + +/** + * Use to check input file reading for both compressed and uncompressed files. + * Also checks that key-value output, key only, and value only output works. + */ + +composite ReadAndCount { + +param expression $inputFile: getSubmissionTimeValue("inputFile"); +expression $configFile: getSubmissionTimeValue("configFile"); +expression> $recordCountArray: (list)getSubmissionTimeValue("recordCountArray"); + +graph + +stream Both = InputFormatReader() { +param +file: $inputFile; +configResources: $configFile; +} + +() as bothCheck = CheckCount(Both) { +param +tupleCount: $recordCountArray[getChannel()]; +} +} + +composite Main { + +graph +@parallel(width=2) +() as channel = ReadAndCount() { + +} + +} + diff --git a/tests/InputFormatReader/ReadTextSingle/Main.spl b/tests/InputFormatReader/ReadTextSingle/Main.spl new file mode 100644 index 0000000..75f4480 --- /dev/null +++ b/tests/InputFormatReader/ReadTextSingle/Main.spl @@ -0,0 +1,51 @@ +/* Copyright (C) 2015, International Business Machines Corporation */ +/* All Rights Reserved */ + +use com.ibm.streamsx.hdfs::InputFormatReader; +use hdfs.test.common::CheckCount; + +/** + * Use to check input file reading for both compressed and uncompressed files. + * Also checks that key-value output, key only, and value only output works. + */ + +composite Main { + +param expression $inputFile: getSubmissionTimeValue("inputFile"); +expression $configFile: getSubmissionTimeValue("configFile"); +expression $recordCount: (int32)getSubmissionTimeValue("recordCount"); + +graph + +stream Both = InputFormatReader() { +param +file: $inputFile; +configResources: $configFile; +} + +() as bothCheck = CheckCount(Both) { +param +tupleCount: $recordCount; +} + +stream ValueOnly = InputFormatReader() { +param file: $inputFile; +configResources: $configFile; +} + +() as valueCheck = CheckCount(ValueOnly) { +param +tupleCount: $recordCount; +} + + +stream KeyOnly = InputFormatReader() { +param file: $inputFile; +configResources: $configFile; +} + +() as keyCheck = CheckCount(KeyOnly) { +param tupleCount: $recordCount; +} + +} diff --git a/tests/hdfs.test.common/hdfs.test.common/CheckCount.spl b/tests/hdfs.test.common/hdfs.test.common/CheckCount.spl new file mode 100644 index 0000000..0eabddd --- /dev/null +++ b/tests/hdfs.test.common/hdfs.test.common/CheckCount.spl @@ -0,0 +1,27 @@ +/* Copyright (C) 2015, International Business Machines Corporation */ +/* All Rights Reserved */ + +namespace hdfs.test.common; + +public composite CheckCount(input InStream) { + +param +expression $tupleCount; + +graph +() as checker = Custom(InStream) { + +logic state: { + mutable int32 numTuples = 0; + int32 targetNumber = $tupleCount; +} +onTuple InStream: { + numTuples++; +} +onPunct InStream: + if (currentPunct() == Sys.FinalMarker) { + assert(numTuples==targetNumber, "Received "+(rstring)numTuples+" but expected "+(rstring)targetNumber); + } +} + +}