Skip to content

Commit

Permalink
Merge pull request #164 from IBMStreams/issue162
Browse files Browse the repository at this point in the history
Fixes #162 for master branch
  • Loading branch information
hildrum committed Oct 2, 2015
2 parents 7602976 + ff53379 commit 826e61d
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ void MY_OPERATOR::process(uint32_t idx)
SPLAPPTRC(L_DEBUG, "Processing...", "InetSource");

OPort0Type tuple;
string record;
istringstream retrievalBuffer;
int recordCounter = 0;
<% if($doNotStreamInitialFetch) { %>
Expand Down Expand Up @@ -323,12 +322,17 @@ void MY_OPERATOR::process(uint32_t idx)
*/

// Start of loop here, one loop cycle per input record, until retrieval buffer is exhausted
getline(retrievalBuffer, record);
if(record.size() > 0 || !retrievalBuffer.eof())
{
do
{
if(inputLinesPerRecord_ > 1)

while(!retrievalBuffer.eof()) {
string record;
getline(retrievalBuffer,record);

// if there's no data left, we should exit this loop
if(retrievalBuffer.eof() && record.size() == 0) {
break;
}

if(inputLinesPerRecord_ > 1)
{
/*
* Here, the user asked for multiple lines per record, so append the contents of the additional
Expand Down Expand Up @@ -432,9 +436,7 @@ void MY_OPERATOR::process(uint32_t idx)
else break; // when there is no fragmentation, we are done
}
<% } %>
if(!retrievalBuffer.eof()) getline(retrievalBuffer, record);
} while(!retrievalBuffer.eof());
} // end of if(record.size() > 0 || !retrievalBuffer.eof())
}
<% if($outputTypeIsList && $emitTuplePerURI) { %>
/*
* Here, emitTuplePerURI was requested, so flush the internal list buffer(s) into its(their)
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.inet/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Alternatively, you can fully qualify the operators that are provided by toolkit
4. Start the InfoSphere Streams instance.
5. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio.
</description>
<version>2.6.0</version>
<version>2.6.1</version>
<requiredProductVersion>4.0.0.0</requiredProductVersion>
</identity>
<dependencies/>
Expand Down
40 changes: 40 additions & 0 deletions tests/InetSource/NoNewlineAtEnd/Main.spl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

/**
* This composite is meant to be run as a test. If compiled in standalone mode and executed,
* it will give return 0 if the test is successful, and non-zero if the test fails. It should
* be able to be invoked by any test harness.
*/

composite Main {

graph
stream<rstring result> NoNewline = com.ibm.streamsx.inet::InetSource() {
param
// the data returned by this URL does not have a newline at the end.
URIList: ["http://services.faa.gov/airport/status/ATL?format=application/xml"];
fetchInterval: 5.0;

}

// make sure we got the last line.
() as checkNoNewline = Custom(NoNewline) {
logic state: {
mutable int32 numEnter = 0;
mutable int32 numExits = 0;
}
onTuple NoNewline: {
if (size(regexMatch(result,"\\s*<AirportStatus>\\s*")) > 0) {
numEnter++;
}
else if (size(regexMatch(result,"\\s*</AirportStatus\\s*"))>0) {
numExits++;
}
if (numEnter-numExits > 1) {
abort();
}
if (numEnter == numExits && numExits > 0) {
shutdownPE();
}
}
}
}

0 comments on commit 826e61d

Please sign in to comment.