diff --git a/com.ibm.streamsx.inet/com.ibm.streamsx.inet/InetSource/InetSource_cpp.cgt b/com.ibm.streamsx.inet/com.ibm.streamsx.inet/InetSource/InetSource_cpp.cgt index 40dd6b0..a64228c 100644 --- a/com.ibm.streamsx.inet/com.ibm.streamsx.inet/InetSource/InetSource_cpp.cgt +++ b/com.ibm.streamsx.inet/com.ibm.streamsx.inet/InetSource/InetSource_cpp.cgt @@ -224,7 +224,6 @@ void MY_OPERATOR::process(uint32_t idx) SPLAPPTRC(L_DEBUG, "Processing...", "InetSource"); OPort0Type tuple; - string record; istringstream retrievalBuffer; int recordCounter = 0; <% if($doNotStreamInitialFetch) { %> @@ -323,12 +322,17 @@ void MY_OPERATOR::process(uint32_t idx) */ // Start of loop here, one loop cycle per input record, until retrieval buffer is exhausted - getline(retrievalBuffer, record); - if(record.size() > 0 || !retrievalBuffer.eof()) - { - do - { - if(inputLinesPerRecord_ > 1) + + while(!retrievalBuffer.eof()) { + string record; + getline(retrievalBuffer,record); + + // if there's no data left, we should exit this loop + if(retrievalBuffer.eof() && record.size() == 0) { + break; + } + + if(inputLinesPerRecord_ > 1) { /* * Here, the user asked for multiple lines per record, so append the contents of the additional @@ -432,9 +436,7 @@ void MY_OPERATOR::process(uint32_t idx) else break; // when there is no fragmentation, we are done } <% } %> - if(!retrievalBuffer.eof()) getline(retrievalBuffer, record); - } while(!retrievalBuffer.eof()); - } // end of if(record.size() > 0 || !retrievalBuffer.eof()) + } <% if($outputTypeIsList && $emitTuplePerURI) { %> /* * Here, emitTuplePerURI was requested, so flush the internal list buffer(s) into its(their) diff --git a/com.ibm.streamsx.inet/info.xml b/com.ibm.streamsx.inet/info.xml index 7d54955..21861b0 100644 --- a/com.ibm.streamsx.inet/info.xml +++ b/com.ibm.streamsx.inet/info.xml @@ -48,7 +48,7 @@ Alternatively, you can fully qualify the operators that are provided by toolkit 4. Start the InfoSphere Streams instance. 5. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio. - 2.6.0 + 2.6.1 4.0.0.0 diff --git a/tests/InetSource/NoNewlineAtEnd/Main.spl b/tests/InetSource/NoNewlineAtEnd/Main.spl new file mode 100644 index 0000000..5d8217e --- /dev/null +++ b/tests/InetSource/NoNewlineAtEnd/Main.spl @@ -0,0 +1,40 @@ + +/** + * This composite is meant to be run as a test. If compiled in standalone mode and executed, + * it will give return 0 if the test is successful, and non-zero if the test fails. It should + * be able to be invoked by any test harness. + */ + +composite Main { + +graph + stream NoNewline = com.ibm.streamsx.inet::InetSource() { + param + // the data returned by this URL does not have a newline at the end. + URIList: ["http://services.faa.gov/airport/status/ATL?format=application/xml"]; + fetchInterval: 5.0; + + } + + // make sure we got the last line. + () as checkNoNewline = Custom(NoNewline) { + logic state: { + mutable int32 numEnter = 0; + mutable int32 numExits = 0; + } + onTuple NoNewline: { + if (size(regexMatch(result,"\\s*\\s*")) > 0) { + numEnter++; + } + else if (size(regexMatch(result,"\\s*0) { + numExits++; + } + if (numEnter-numExits > 1) { + abort(); + } + if (numEnter == numExits && numExits > 0) { + shutdownPE(); + } + } + } +}