diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 0f36671..0000000 --- a/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -toolkit.xml -*StreamsModel.java -/com.ibm.streamsx.hdfs/com.ibm.streamsx.hdfs/InputFormatReader -tests/*/*/output diff --git a/com.ibm.streamsx.hdfs/.classpath b/com.ibm.streamsx.hdfs/.classpath index 2ceb108..06951b2 100644 --- a/com.ibm.streamsx.hdfs/.classpath +++ b/com.ibm.streamsx.hdfs/.classpath @@ -8,6 +8,5 @@ - diff --git a/com.ibm.streamsx.hdfs/build.xml b/com.ibm.streamsx.hdfs/build.xml index 9254f18..513b60c 100644 --- a/com.ibm.streamsx.hdfs/build.xml +++ b/com.ibm.streamsx.hdfs/build.xml @@ -51,15 +51,11 @@ - - - - diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/InputFormatReader.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/InputFormatReader.java deleted file mode 100644 index c5ac453..0000000 --- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/InputFormatReader.java +++ /dev/null @@ -1,320 +0,0 @@ -/* Copyright (C) 2015, International Business Machines Corporation */ -/* All Rights Reserved */ - -package com.ibm.streamsx.hdfs; - - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.log4j.Logger; - -import com.ibm.streams.operator.AbstractOperator; -import com.ibm.streams.operator.Attribute; -import com.ibm.streams.operator.OperatorContext; -import com.ibm.streams.operator.OperatorContext.ContextCheck; -import com.ibm.streams.operator.OutputTuple; -import com.ibm.streams.operator.StreamSchema; -import com.ibm.streams.operator.StreamingData.Punctuation; -import com.ibm.streams.operator.StreamingOutput; -import com.ibm.streams.operator.Type.MetaType; -import com.ibm.streams.operator.compile.OperatorContextChecker; -import com.ibm.streams.operator.model.Libraries; -import com.ibm.streams.operator.model.OutputPortSet; -import com.ibm.streams.operator.model.OutputPortSet.WindowPunctuationOutputMode; -import com.ibm.streams.operator.model.OutputPorts; -import com.ibm.streams.operator.model.Parameter; -import com.ibm.streams.operator.model.PrimitiveOperator; -import java.io.File; - -/** - * Operator to read input formats. - */ -@Libraries({"opt/downloaded/*","opt/inputformatreader/*"}) -@PrimitiveOperator(name="InputFormatReader", namespace="com.ibm.streamsx.hdfs", -description="Java Operator InputFormatReader") -@OutputPorts({@OutputPortSet(description="The operator will populate the key and value attribute on this port with the key-value pairs given by the record reader.", cardinality=1, optional=false, windowPunctuationOutputMode=WindowPunctuationOutputMode.Generating), @OutputPortSet(description="Optional output ports", optional=true, windowPunctuationOutputMode=WindowPunctuationOutputMode.Generating)}) -public class InputFormatReader extends AbstractOperator { - - public static final String consistentCutIntroducer="\\n\\n**Behavior in a consistent region**\\n\\n"; - - public static final String description = "The operator uses the InputFormat interface to read data from HDFS. Use the fileType parameter to set the input format type. Two are currently supported--text and sequence. The default is text. "+ - "The text format supports both compressed and uncompressed text."+ - " The operator has one output port It has two optional attributes, key and value. These correspond to the key and value returned by the underlying RecordReader."+ - "**Configurating the operator**"+ - "To configure the operator, set the configResource parameter to your core-site.xml file."+ - consistentCutIntroducer+ - "The operator has no consistent region support."; - private static final String INPUT_PARAM_NAME = "file"; - private static final String FILETYPE_PARAM_NAME="fileType"; - Logger logger = Logger.getLogger(this.getClass()); - - public static enum FileType { - text, - sequence - } - - private FileType fileType = FileType.text; - - @Parameter(name=FILETYPE_PARAM_NAME,description="Type of file. Use text for uncompressed and uncompressed text files and sequence for sequence files",optional=true) - public void setFileType(FileType type) { - fileType = type; - } - - - - - private static final String CONFIG_RESOURCES_PARAM_NAME = "configResources"; - - - List splits; - /** - * Thread for calling produceTuples() to produce tuples - */ - private Thread processThread; - - - - private String inputFiles[]; - private String configResources[]; - FileInputFormat inputFormat; - int keyIndex = -1; - int valueIndex = -1; - - int channel = -1; - int maxChannels = -1; - Configuration conf; - - - @Parameter(name=INPUT_PARAM_NAME,description="Paths to be used as input") - public void setFile(String[] files ) { - inputFiles = files; - } - - @Parameter(name=CONFIG_RESOURCES_PARAM_NAME,optional=true,description="Resources to be added to the configuration") - public void setResources(String[] paths) { - configResources = paths; - } - - @ContextCheck(compile=true) - public static void compileChecks(OperatorContextChecker checker) { - StreamSchema outSchema= checker.getOperatorContext().getStreamingOutputs().get(0).getStreamSchema(); - - Attribute keyAttr = outSchema.getAttribute("key"); - Attribute valueAttr = outSchema.getAttribute("value"); - - - if (keyAttr == null && valueAttr == null) { - checker.setInvalidContext("Either key or value must be on output stream ", new Object[0]); - } - - if (checker.getOperatorContext().getNumberOfStreamingOutputs() != 1) { - checker.setInvalidContext("Number of streaming outputs must be 1",new Object[0]); - } - } - - - @ContextCheck(compile=false) - public static void runtimeCheck(OperatorContextChecker checker) { - StreamSchema outSchema= checker.getOperatorContext().getStreamingOutputs().get(0).getStreamSchema(); - - Attribute keyAttr = outSchema.getAttribute("key"); - Attribute valueAttr = outSchema.getAttribute("value"); - - - if (keyAttr != null && keyAttr.getType().getMetaType() != MetaType.INT64) { - checker.setInvalidContext("Type of key on output stream must be int64", new Object[0]); - } - - if (valueAttr != null && valueAttr.getType().getMetaType() != MetaType.RSTRING - && valueAttr.getType().getMetaType() != MetaType.USTRING) { - checker.setInvalidContext("Type of value on output stream must be RString or ustring", new Object[0]); - } - - } - - /** - * Initialize this operator. Called once before any tuples are processed. - * @param context OperatorContext for this operator. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. - */ - @Override - public synchronized void initialize(OperatorContext context) - throws Exception { - // Must call super.initialize(context) to correctly setup an operator. - super.initialize(context); - Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); - - - StreamingOutput out = context.getStreamingOutputs().get(0); - StreamSchema outSchema = out.getStreamSchema(); - - channel = context.getChannel(); - maxChannels = context.getMaxChannels(); - - // This will make the for loop in process tuples work right--we take splits - // channel + k* maxChannels. - if (channel < 0) { - channel = 0; - maxChannels = 1; - } - - if (outSchema.getAttribute("key") != null) { - keyIndex = outSchema.getAttributeIndex("key"); - } - if (outSchema.getAttribute("value") != null) { - valueIndex = outSchema.getAttributeIndex("value"); - } - - // Establish input splits. - try { - - //Class> inputFormatClass= (Class>) Class.forName(inputFormatClassname); - //FileInputFormat inputFormat = inputFormatClass.newInstance(); - - if (fileType == FileType.text) { - inputFormat = new TextInputFormat(); - } - else { - inputFormat = new SequenceFileInputFormat(); - } - - conf = new Configuration(); - if (configResources != null ) { - for (String s : configResources) { - File toAdd = new File(s); - String pathToFile; - if (toAdd.isAbsolute()) { - pathToFile = toAdd.getAbsolutePath(); - } - else { - pathToFile = context.getPE().getApplicationDirectory()+File.separator+s; - toAdd = new File(pathToFile); - } - if (!toAdd.exists()) { - throw new Exception("Specified configuration file "+s+" not found at "+pathToFile); - } - logger.info("Adding "+pathToFile+" as config resource"); - conf.addResource(new Path(pathToFile)); - } - String defaultFS = conf.get("fs.defaultFS"); - if (!defaultFS.startsWith("hdfs")) { - logger.warn("Default file system not HDFS; may be configuration problem"); - } - logger.debug("Default file system is "+defaultFS); - } - Job job = Job.getInstance(conf); - - for (String p : inputFiles) { - FileInputFormat.addInputPath(job, new Path(p)); - } - - splits = inputFormat.getSplits(job); - logger.info("There are "+splits.size()+" splits"); - } - catch (IOException e ) { - throw e; - } - - - processThread = getOperatorContext().getThreadFactory().newThread( - new Runnable() { - - @Override - public void run() { - try { - produceTuples(); - } catch (Exception e) { - Logger.getLogger(this.getClass()).error("Operator error", e); - } - } - - }); - - /* - * Set the thread not to be a daemon to ensure that the SPL runtime - * will wait for the thread to complete before determining the - * operator is complete. - */ - processThread.setDaemon(false); - } - - /** - * Notification that initialization is complete and all input and output ports - * are connected and ready to receive and submit tuples. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. - */ - @Override - public synchronized void allPortsReady() throws Exception { - OperatorContext context = getOperatorContext(); - Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " all ports are ready in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); - // Start a thread for producing tuples because operator - // implementations must not block and must return control to the caller. - processThread.start(); - } - - /** - * Submit new tuples to the output stream - * @throws Exception if an error occurs while submitting a tuple - */ - private void produceTuples() throws Exception { - final StreamingOutput out = getOutput(0); - - for (int i = channel; i < splits.size(); i = i + maxChannels) { - if (logger.isInfoEnabled()) { - logger.info("Handling split "+i); - } - TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID("channel "+channel+" of "+maxChannels,0,TaskType.MAP,i,0)); - - RecordReader reader = inputFormat.createRecordReader(splits.get(i),context); - reader.initialize(splits.get(i), context); - - while (reader.nextKeyValue()) { - OutputTuple toSend = out.newTuple(); - // TODO set filename, if it makes sense. - if (keyIndex >= 0) { - toSend.setLong(keyIndex, reader.getCurrentKey().get()); - } - if (valueIndex >= 0) { - toSend.setString(valueIndex, reader.getCurrentValue().toString()); - } - out.submit(toSend); - } - out.punctuate(Punctuation.WINDOW_MARKER); - } - out.punctuate(Punctuation.FINAL_MARKER); - } - - /** - * Shutdown this operator, which will interrupt the thread - * executing the produceTuples() method. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. - */ - public synchronized void shutdown() throws Exception { - if (processThread != null) { - processThread.interrupt(); - processThread = null; - } - OperatorContext context = getOperatorContext(); - Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " shutting down in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); - - // TODO: If needed, close connections or release resources related to any external system or data store. - - // Must call super.shutdown() - super.shutdown(); - } -} diff --git a/com.ibm.streamsx.hdfs/info.xml b/com.ibm.streamsx.hdfs/info.xml index a5fa74d..54d56f4 100644 --- a/com.ibm.streamsx.hdfs/info.xml +++ b/com.ibm.streamsx.hdfs/info.xml @@ -156,7 +156,7 @@ Alternatively, you can fully qualify the operators that are provided by toolkit 9. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio. - 2.1.0 + 2.0.0 4.0.0.0 diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/.gitignore b/com.ibm.streamsx.hdfs/opt/inputformatreader/.gitignore deleted file mode 100644 index 65f3cd7..0000000 --- a/com.ibm.streamsx.hdfs/opt/inputformatreader/.gitignore +++ /dev/null @@ -1 +0,0 @@ -ibm-compression.jar diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/avro-1.7.4.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/avro-1.7.4.jar deleted file mode 100644 index afcfc70..0000000 Binary files a/com.ibm.streamsx.hdfs/opt/inputformatreader/avro-1.7.4.jar and /dev/null differ diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-cli-1.2.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-cli-1.2.jar deleted file mode 100644 index ce4b9ff..0000000 Binary files a/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-cli-1.2.jar and /dev/null differ diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-configuration-1.6.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-configuration-1.6.jar deleted file mode 100644 index 2d4689a..0000000 Binary files a/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-configuration-1.6.jar and /dev/null differ diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-logging-1.1.1.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-logging-1.1.1.jar deleted file mode 100644 index 1deef14..0000000 Binary files a/com.ibm.streamsx.hdfs/opt/inputformatreader/commons-logging-1.1.1.jar and /dev/null differ diff --git a/com.ibm.streamsx.hdfs/opt/inputformatreader/hadoop-core.jar b/com.ibm.streamsx.hdfs/opt/inputformatreader/hadoop-core.jar deleted file mode 100644 index 47a5383..0000000 Binary files a/com.ibm.streamsx.hdfs/opt/inputformatreader/hadoop-core.jar and /dev/null differ diff --git a/demos/WordCount/.project b/demos/WordCount/.project deleted file mode 100644 index 03df5a9..0000000 --- a/demos/WordCount/.project +++ /dev/null @@ -1,34 +0,0 @@ - - - WordCount - - - - - - org.eclipse.ui.externaltools.ExternalToolBuilder - full,incremental, - - - LaunchConfigHandle - <project>/.externalToolBuilders/org.eclipse.jdt.core.javabuilder.launch - - - - - com.ibm.streams.studio.splproject.builder.SPLProjectBuilder - - - - - org.eclipse.xtext.ui.shared.xtextBuilder - - - - - - com.ibm.streams.studio.splproject.SPLProjectNature - org.eclipse.xtext.ui.shared.xtextNature - org.eclipse.jdt.core.javanature - - diff --git a/demos/WordCount/ReadAndTokenize.spl b/demos/WordCount/ReadAndTokenize.spl deleted file mode 100644 index 11140d2..0000000 --- a/demos/WordCount/ReadAndTokenize.spl +++ /dev/null @@ -1,46 +0,0 @@ -use com.ibm.streamsx.hdfs::InputFormatReader ; -type EmailTuple = rstring ID, rstring From, rstring Date, rstring Subject, - rstring ToList, rstring CcList, rstring BccList, rstring Body ; - -composite ReadAndTokenize(output WordStream ) -{ - param - expression $inputFile ; - expression $configFile : "etc/core-site.xml" ; - graph - stream TuplesAsString = InputFormatReader() - { - param - file : $inputFile ; - configResources : $configFile ; - } - - stream Body = Functor(TuplesAsString) - { - output - Body : - // body = "This is a test"; - body =((EmailTuple) value).Body ; - } - - stream WordStream = Custom(Body) - { - logic - onTuple Body : - { - list words = tokenize(body, ",\r\n \t", false) ; - for(rstring w in words) - { - submit({ word = w }, WordStream) ; - } - - } - - onPunct Body : - { - submit(currentPunct(), WordStream) ; - } - - } - -} diff --git a/demos/WordCount/ReadTokenizeCombine.spl b/demos/WordCount/ReadTokenizeCombine.spl deleted file mode 100644 index 409f0f4..0000000 --- a/demos/WordCount/ReadTokenizeCombine.spl +++ /dev/null @@ -1,56 +0,0 @@ -use com.ibm.streamsx.hdfs::InputFormatReader ; - -composite ReadTokenizeCombine(output WordCounts ) -{ - param - expression $inputFile ; - expression $configFile : "etc/core-site.xml" ; - graph - stream TuplesAsString = InputFormatReader() - { - param - file : $inputFile ; - configResources : $configFile ; - } - - stream Body = Functor(TuplesAsString) - { - output - Body : - // body = "This is a test"; - body =((EmailTuple) value).Body ; - } - - stream WordStream = Custom(Body) - { - logic - onTuple Body : - { - list words = tokenize(body, ",\r\n \t", false) ; - mutable tuple toSubmit = (tuple){}; - for(rstring w in words) - { - toSubmit.word = w; - submit(toSubmit, WordStream) ; - } - - } - - onPunct Body : - { - submit(currentPunct(), WordStream) ; - } - - } - - stream WordCounts = Aggregate(WordStream) - { - window - WordStream : tumbling, punct(), partitioned ; - param - partitionBy : word ; - output - WordCounts : count = Count() ; - } - -} diff --git a/demos/WordCount/WordCount.spl b/demos/WordCount/WordCount.spl deleted file mode 100644 index 7c2d208..0000000 --- a/demos/WordCount/WordCount.spl +++ /dev/null @@ -1,66 +0,0 @@ - - -composite Reduce(input InStream ; output WordCount) -{ - graph - stream Puncted = Custom(InStream) - { - logic - onTuple InStream : submit(InStream, Puncted) ; - onPunct InStream : - { - if(currentPunct() == Sys.FinalMarker) - { - submit(Sys.WindowMarker, Puncted) ; - submit(Sys.FinalMarker, Puncted) ; - } - - } - - } - - stream WordCount = Aggregate(Puncted as I) - { - window - I : tumbling, punct(), partitioned ; - param - partitionBy : word ; - output - WordCount : count = Count() ; - } - - () as sink = FileSink(WordCount) - { - param - file : "words" +(rstring) getChannel() + ".txt" ; - } - -} - -composite WordCount -{ - param - expression $numMappers :(int32) getSubmissionTimeValue("numMappers", - "1") ; - expression $numReducers :(int32) getSubmissionTimeValue("numReducers", - "1") ; - graph - @parallel(width = $numMappers) - stream WordStream = ReadAndTokenize() - { - param - inputFile : getSubmissionTimeValue("inputFile", "allEnron.txt") ; - config - placement : partitionColocation("map") ; - } - - @parallel(width = $numReducers, partitionBy = [ { port = WordStream, - attributes = [ word ] } ]) - stream WordCount = Reduce(WordStream) - { - config - placement : partitionColocation("reduce") ; - } - -} - diff --git a/demos/WordCount/WordCountCombine.spl b/demos/WordCount/WordCountCombine.spl deleted file mode 100644 index 49fdcd9..0000000 --- a/demos/WordCount/WordCountCombine.spl +++ /dev/null @@ -1,65 +0,0 @@ - - -composite ReduceFromCombine(input InStream ; output WordCount) -{ - graph - stream Puncted = Custom(InStream) - { - logic - onTuple InStream : submit(InStream, Puncted) ; - onPunct InStream : - { - if(currentPunct() == Sys.FinalMarker) - { - submit(Sys.WindowMarker, Puncted) ; - submit(Sys.FinalMarker, Puncted) ; - } - } - - } - - stream WordCount = Aggregate(Puncted as I) - { - window - I : tumbling, punct(), partitioned ; - param - partitionBy : word ; - output - WordCount : count = Sum(count) ; - } - - () as sink = FileSink(WordCount) - { - param - file : "combine_words" +(rstring) getChannel() + ".txt" ; - } - -} - -composite WordCountCombine -{ - param - expression $numMappers :(int32) getSubmissionTimeValue("numMappers", - "1") ; - expression $numReducers :(int32) getSubmissionTimeValue("numReducers", - "1") ; - graph - @parallel(width = $numMappers) - stream WordStream = ReadTokenizeCombine() - { - param - inputFile : getSubmissionTimeValue("inputFile", "allEnron.txt") ; - config - placement : partitionColocation("map") ; - } - - @parallel(width = $numReducers, partitionBy = [ { port = WordStream, - attributes = [ word ] } ]) - stream WordCount = ReduceFromCombine(WordStream) - { - config - placement : partitionColocation("reduce") ; - } - -} - diff --git a/demos/WordCount/info.xml b/demos/WordCount/info.xml deleted file mode 100644 index 8c30255..0000000 --- a/demos/WordCount/info.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - - WordCount - Show how to implement WordCount and WordCountWithCombine - 1.0.0 - 4.0 - - - - com.ibm.streamsx.hdfs - 2.1 - - - diff --git a/tests/InputFormatReader/ReadSequenceParallel/Main.spl b/tests/InputFormatReader/ReadSequenceParallel/Main.spl deleted file mode 100644 index 946e375..0000000 --- a/tests/InputFormatReader/ReadSequenceParallel/Main.spl +++ /dev/null @@ -1,41 +0,0 @@ -/* Copyright (C) 2015, International Business Machines Corporation */ -/* All Rights Reserved */ -use com.ibm.streamsx.hdfs::InputFormatReader; -use hdfs.test.common::CheckCount; - -/** - * Use to check input file reading for both compressed and uncompressed files. - * Also checks that key-value output, key only, and value only output works. - */ - -composite ReadAndCount { - -param expression $inputFile: getSubmissionTimeValue("inputFile"); -expression $configFile: getSubmissionTimeValue("configFile"); -expression> $recordCountArray: (list)getSubmissionTimeValue("recordCountArray"); - -graph - -stream Both = InputFormatReader() { -param -file: $inputFile; -configResources: $configFile; -fileType:sequence; -} - -() as bothCheck = CheckCount(Both) { -param -tupleCount: $recordCountArray[getChannel()]; -} -} - -composite Main { - -graph -@parallel(width=2) -() as channel = ReadAndCount() { - -} - -} - diff --git a/tests/InputFormatReader/ReadSequenceSingle/Main.spl b/tests/InputFormatReader/ReadSequenceSingle/Main.spl deleted file mode 100644 index aecdad5..0000000 --- a/tests/InputFormatReader/ReadSequenceSingle/Main.spl +++ /dev/null @@ -1,33 +0,0 @@ -/* Copyright (C) 2015, International Business Machines Corporation */ -/* All Rights Reserved */ - -use com.ibm.streamsx.hdfs::InputFormatReader; -use hdfs.test.common::CheckCount; - -/** - * Use to check input file reading for both compressed and uncompressed files. - * Also checks that key-value output, key only, and value only output works. - */ - -composite Main { - -param expression $inputFile: getSubmissionTimeValue("inputFile"); -expression $configFile: getSubmissionTimeValue("configFile"); -expression $recordCount: (int32)getSubmissionTimeValue("recordCount"); - -graph - -stream Both = InputFormatReader() { -param -file: $inputFile; -configResources: $configFile; -fileType:sequence; -} - -() as bothCheck = CheckCount(Both) { -param -tupleCount: $recordCount; -} - - -} diff --git a/tests/InputFormatReader/ReadTextParallel/Main.spl b/tests/InputFormatReader/ReadTextParallel/Main.spl deleted file mode 100644 index fce3e33..0000000 --- a/tests/InputFormatReader/ReadTextParallel/Main.spl +++ /dev/null @@ -1,41 +0,0 @@ -/* Copyright (C) 2015, International Business Machines Corporation */ -/* All Rights Reserved */ - -use com.ibm.streamsx.hdfs::InputFormatReader; -use hdfs.test.common::CheckCount; - -/** - * Use to check input file reading for both compressed and uncompressed files. - * Also checks that key-value output, key only, and value only output works. - */ - -composite ReadAndCount { - -param expression $inputFile: getSubmissionTimeValue("inputFile"); -expression $configFile: getSubmissionTimeValue("configFile"); -expression> $recordCountArray: (list)getSubmissionTimeValue("recordCountArray"); - -graph - -stream Both = InputFormatReader() { -param -file: $inputFile; -configResources: $configFile; -} - -() as bothCheck = CheckCount(Both) { -param -tupleCount: $recordCountArray[getChannel()]; -} -} - -composite Main { - -graph -@parallel(width=2) -() as channel = ReadAndCount() { - -} - -} - diff --git a/tests/InputFormatReader/ReadTextSingle/Main.spl b/tests/InputFormatReader/ReadTextSingle/Main.spl deleted file mode 100644 index 75f4480..0000000 --- a/tests/InputFormatReader/ReadTextSingle/Main.spl +++ /dev/null @@ -1,51 +0,0 @@ -/* Copyright (C) 2015, International Business Machines Corporation */ -/* All Rights Reserved */ - -use com.ibm.streamsx.hdfs::InputFormatReader; -use hdfs.test.common::CheckCount; - -/** - * Use to check input file reading for both compressed and uncompressed files. - * Also checks that key-value output, key only, and value only output works. - */ - -composite Main { - -param expression $inputFile: getSubmissionTimeValue("inputFile"); -expression $configFile: getSubmissionTimeValue("configFile"); -expression $recordCount: (int32)getSubmissionTimeValue("recordCount"); - -graph - -stream Both = InputFormatReader() { -param -file: $inputFile; -configResources: $configFile; -} - -() as bothCheck = CheckCount(Both) { -param -tupleCount: $recordCount; -} - -stream ValueOnly = InputFormatReader() { -param file: $inputFile; -configResources: $configFile; -} - -() as valueCheck = CheckCount(ValueOnly) { -param -tupleCount: $recordCount; -} - - -stream KeyOnly = InputFormatReader() { -param file: $inputFile; -configResources: $configFile; -} - -() as keyCheck = CheckCount(KeyOnly) { -param tupleCount: $recordCount; -} - -} diff --git a/tests/hdfs.test.common/hdfs.test.common/CheckCount.spl b/tests/hdfs.test.common/hdfs.test.common/CheckCount.spl deleted file mode 100644 index 0eabddd..0000000 --- a/tests/hdfs.test.common/hdfs.test.common/CheckCount.spl +++ /dev/null @@ -1,27 +0,0 @@ -/* Copyright (C) 2015, International Business Machines Corporation */ -/* All Rights Reserved */ - -namespace hdfs.test.common; - -public composite CheckCount(input InStream) { - -param -expression $tupleCount; - -graph -() as checker = Custom(InStream) { - -logic state: { - mutable int32 numTuples = 0; - int32 targetNumber = $tupleCount; -} -onTuple InStream: { - numTuples++; -} -onPunct InStream: - if (currentPunct() == Sys.FinalMarker) { - assert(numTuples==targetNumber, "Received "+(rstring)numTuples+" but expected "+(rstring)targetNumber); - } -} - -}