Skip to content

Commit

Permalink
fix merge in version
Browse files Browse the repository at this point in the history
  • Loading branch information
hildrum committed Oct 4, 2015
2 parents b6f663f + ed3e8a4 commit 0e14ee4
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,25 @@
<function:prototype cppName="">public rstring httpGet(rstring url,list&lt;rstring> extraHeaders, rstring user, rstring password, mutable int32 error)</function:prototype>
</function:function>
<function:function>
<function:description>HTTP DELETE on the given url, using the username and password, if present. It follows redirects. If there is an error, a non-zero is returned by error.</function:description>
<function:prototype cppName="">public rstring httpDelete(rstring url,list&lt;rstring> extraHeaders, rstring user, rstring password, mutable int32 error)</function:prototype>
</function:function>
<function:function>
<function:description>HTTP PUT data to the given url using the username and password given if not empty. Does not follow redirects. Headers are returned in the headers list, error is set to a non-zero if there is an error, and the result of the PUT is returned in the rstring.</function:description>
<function:prototype>public rstring httpPut(rstring data, rstring url, list&lt;rstring> extraHeaders, rstring username, rstring password, mutable list&lt;rstring> headers, mutable int32 error)</function:prototype>
</function:function>
<function:function>
<function:description>HTTP POST data to the given url using the username and password (if non-empty). Does not follow redirects. Headers are returned in the headers list. Error is set to non-zero if there is an error. The result of the POST is returned as an rstring. </function:description>
<function:prototype>public rstring httpPost(rstring data, rstring url, list&lt;rstring> extraHeaders, rstring username, rstring password, mutable list&lt;rstring> headers, mutable int32 error)</function:prototype>
</function:function>
<function:function>
<function:description>Decode a URL encoded rstring.</function:description>
<function:prototype>public rstring urlDecode(rstring decode)</function:prototype>
</function:function>
<function:function>
<function:description>URL encode the given rstring.</function:description>
<function:prototype>public rstring urlEncode(rstring raw)</function:prototype>
</function:function>
</function:functions>
<function:dependencies>
<function:library>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ void MY_OPERATOR::process(uint32_t idx)
SPLAPPTRC(L_DEBUG, "Processing...", "InetSource");

OPort0Type tuple;
string record;
istringstream retrievalBuffer;
int recordCounter = 0;
<% if($doNotStreamInitialFetch) { %>
Expand Down Expand Up @@ -323,12 +322,17 @@ void MY_OPERATOR::process(uint32_t idx)
*/

// Start of loop here, one loop cycle per input record, until retrieval buffer is exhausted
getline(retrievalBuffer, record);
if(record.size() > 0 || !retrievalBuffer.eof())
{
do
{
if(inputLinesPerRecord_ > 1)

while(!retrievalBuffer.eof()) {
string record;
getline(retrievalBuffer,record);

// if there's no data left, we should exit this loop
if(retrievalBuffer.eof() && record.size() == 0) {
break;
}

if(inputLinesPerRecord_ > 1)
{
/*
* Here, the user asked for multiple lines per record, so append the contents of the additional
Expand Down Expand Up @@ -432,9 +436,7 @@ void MY_OPERATOR::process(uint32_t idx)
else break; // when there is no fragmentation, we are done
}
<% } %>
if(!retrievalBuffer.eof()) getline(retrievalBuffer, record);
} while(!retrievalBuffer.eof());
} // end of if(record.size() > 0 || !retrievalBuffer.eof())
}
<% if($outputTypeIsList && $emitTuplePerURI) { %>
/*
* Here, emitTuplePerURI was requested, so flush the internal list buffer(s) into its(their)
Expand Down
7 changes: 7 additions & 0 deletions com.ibm.streamsx.inet/impl/cpp/include/httpFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ size_t populate_rstring(char *ptr,size_t size, size_t nmemb, void*userdata);

SPL::rstring httpGet(const SPL::rstring & url, const SPL::list<SPL::rstring> & extraHeaders, const SPL::rstring & username, const SPL::rstring & password,SPL::int32 & error);

SPL::rstring httpDelete(const SPL::rstring & url, const SPL::list<SPL::rstring> & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::int32 & error);

SPL::rstring httpPut(const SPL::rstring & data, const SPL::rstring & url, const SPL::list<SPL::rstring> & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::list<SPL::rstring>& headers, SPL::int32 & error);

SPL::rstring httpPost(const SPL::rstring & data, const SPL::rstring & url, const SPL::list<SPL::rstring> & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::list<SPL::rstring>& headers, SPL::int32 & error);


SPL::rstring urlEncode(const SPL::rstring & raw);

SPL::rstring urlDecode(const SPL::rstring & encoded);


}
#endif
122 changes: 92 additions & 30 deletions com.ibm.streamsx.inet/impl/cpp/src/httpFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ __attribute__((destructor)) void cleanupCurl() {
void addCurlHandle(CURL * handle) {

if (activeCurlPointers == NULL) {
activeCurlPointers = new std::vector<CURL*>(3);
activeCurlPointers = new std::vector<CURL*>(6);
}
activeCurlPointers->push_back(handle);

Expand Down Expand Up @@ -124,12 +124,17 @@ CURLcode addCommonOpts(CURL * curl, const SPL::rstring & url, const SPL::list<SP

if (extraHeaders.size() > 0) {
struct curl_slist * extraHeadersSlist = getSList(extraHeaders);
curl_easy_setopt(curl,CURLOPT_HTTPHEADER,extraHeadersSlist);
res = curl_easy_setopt(curl,CURLOPT_HTTPHEADER,extraHeadersSlist);
if (res!= CURLE_OK) {
return res;
}
}

else {
res = curl_easy_setopt(curl,CURLOPT_HTTPHEADER,NULL);
}
if (res != CURLE_OK) {
return res;
}

res=curl_easy_setopt(curl,CURLOPT_SSL_VERIFYPEER,0);
if (res != CURLE_OK) {
Expand All @@ -154,6 +159,47 @@ CURLcode addCommonOpts(CURL * curl, const SPL::rstring & url, const SPL::list<SP
return CURLE_OK;
}

SPL::rstring urlEncode(const SPL::rstring & raw) {
static __thread CURL* encode = NULL;
if (encode == NULL) {
encode = curl_easy_init();
addCurlHandle(encode);
}
char * result = curl_easy_escape(encode,raw.data(),raw.size());
SPL::rstring toReturn(result);
curl_free(result);
return toReturn;
}


SPL::rstring urlDecode(const SPL::rstring & encoded) {
static __thread CURL* decode = NULL;
if (decode == NULL) {
decode = curl_easy_init();
addCurlHandle(decode);
}
int length = 0;
char * result = curl_easy_unescape(decode,encoded.data(), encoded.size(),&length);
SPL::rstring toReturn(result,length);
curl_free(result);
return toReturn;
}

CURLcode readResultAsRstring(CURL* curl , SPL::rstring & result) {
// Now handle read, for error checking and exception handling
SPL::rstring toReturn;
CURLcode res = curl_easy_setopt(curl,CURLOPT_WRITEDATA,&result);
if (res != CURLE_OK) {
SPLAPPTRC(L_ERROR, "Error " << res << "setting data pointer", logTag);
return res;
}
res = curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION,&(populate_rstring));
if (res != CURLE_OK) {
SPLAPPTRC(L_ERROR, "Error " << res << " setting write function", logTag);
}
return res;
}

class RstringAndIndex {
public:
const SPL::rstring * theData;
Expand All @@ -177,6 +223,38 @@ size_t readFromRstring(char * buffer, size_t size, size_t nitems, void *instream
return i;
}

SPL::rstring httpDelete(const SPL::rstring & url, const SPL::list<SPL::rstring> &extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::list<SPL::rstring> & headers, SPL::int32 & error) {

static __thread CURL* curlDelete = NULL;
error = 0;
if (curlDelete == NULL) {
curlDelete = curl_easy_init();
addCurlHandle(curlDelete);
}
headers.clear();
CURLcode res=addCommonOpts(curlDelete, url, extraHeaders, username, password);
if (res != CURLE_OK) {
error = res;
return "";
}
res = curl_easy_setopt(curlDelete,CURLOPT_CUSTOMREQUEST,"DELETE");
if (res != CURLE_OK) {
error = res;
return "";
}
SPL::rstring toReturn;
res = readResultAsRstring(curlDelete,toReturn);
if (res != CURLE_OK) {
error = res;
return "";
}
res = curl_easy_perform(curlDelete);
if (res != CURLE_OK) {
error = res;
return "";
}
}

SPL::rstring httpPost(const SPL::rstring & data, const SPL::rstring & url, const SPL::list<SPL::rstring> & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::list<SPL::rstring> & headers,SPL::int32 & error) {
static __thread CURL* curlPost =NULL;
error = 0;
Expand Down Expand Up @@ -221,20 +299,17 @@ SPL::rstring httpPost(const SPL::rstring & data, const SPL::rstring & url, const

// Now handle read, for error checking and exception handling
SPL::rstring toReturn;
res = curl_easy_setopt(curlPost,CURLOPT_WRITEDATA,&toReturn);
res = readResultAsRstring(curlPost,toReturn);
if (res != CURLE_OK) {
SPLAPPTRC(L_ERROR, "Error " << res << "setting data pointer", logTag);
error =res;
return "";
error = res;
return "";
}
res = curl_easy_setopt(curlPost,CURLOPT_WRITEFUNCTION,&(populate_rstring));
res = curl_easy_setopt(curlPost,CURLOPT_FOLLOWLOCATION,0);
if (res != CURLE_OK) {
SPLAPPTRC(L_ERROR, "Error " << res << " setting write function", logTag);
error = res;
return "";
error = res;
return "";
}

curl_easy_setopt(curlPost,CURLOPT_FOLLOWLOCATION,0);
SPLAPPTRC(L_DEBUG,"About to perform",logTag);
// ALL DONE! Do action.
res = curl_easy_perform(curlPost);
Expand Down Expand Up @@ -309,17 +384,10 @@ SPL::rstring httpPut(const SPL::rstring & data, const SPL::rstring & url, const
SPLAPPTRC(L_TRACE,"About to perform",logTag);
// Now handle read, for error checking and exception handling
SPL::rstring toReturn;
res = curl_easy_setopt(curlPut,CURLOPT_WRITEDATA,&toReturn);
res = readResultAsRstring(curlPut,toReturn);
if (res != CURLE_OK) {
SPLAPPTRC(L_ERROR, "Error " << res << "setting data pointer", logTag);
error =res;
return "";
}
res = curl_easy_setopt(curlPut,CURLOPT_WRITEFUNCTION,&(populate_rstring));
if (res != CURLE_OK) {
SPLAPPTRC(L_ERROR, "Error " << res << " setting write function", logTag);
error = res;
return "";
error = res;
return "";
}

// ALL DONE! Do action.
Expand Down Expand Up @@ -353,15 +421,9 @@ SPL::rstring httpGet(const SPL::rstring & url, const SPL::list<SPL::rstring> & e
error = res;
return toReturn;
}
res = curl_easy_setopt(curlGet,CURLOPT_WRITEDATA,&toReturn);
if (res != CURLE_OK) {
SPLAPPTRC(L_ERROR, "Error " << res << "setting data pointer", logTag);
error = res;
return toReturn;
}
res = curl_easy_setopt(curlGet,CURLOPT_WRITEFUNCTION,&(populate_rstring));
res = readResultAsRstring(curlGet,toReturn);

if (res != CURLE_OK) {
SPLAPPTRC(L_ERROR, "Error " << res << " setting write function", logTag);
error = res;
return toReturn;
}
Expand Down
4 changes: 2 additions & 2 deletions com.ibm.streamsx.inet/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<toolkitInfoModel xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.ibm.com/xmlns/prod/streams/spl/toolkitInfo" xsi:schemaLocation="http://www.ibm.com/xmlns/prod/streams/spl/toolkitInfo toolkitInfoModel.xsd">
<identity>
<name>com.ibm.streamsx.inet</name>
<description>
<description>
The Internet Toolkit provides support for common internet protocols.

This toolkit separates its functionality into a number of namespaces:
Expand Down Expand Up @@ -48,7 +48,7 @@ Alternatively, you can fully qualify the operators that are provided by toolkit
4. Start the InfoSphere Streams instance.
5. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio.
</description>
<version>2.7</version>
<version>2.7.0</version>
<requiredProductVersion>4.0.0.0</requiredProductVersion>
</identity>
<dependencies/>
Expand Down
40 changes: 40 additions & 0 deletions tests/InetSource/NoNewlineAtEnd/Main.spl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

/**
* This composite is meant to be run as a test. If compiled in standalone mode and executed,
* it will give return 0 if the test is successful, and non-zero if the test fails. It should
* be able to be invoked by any test harness.
*/

composite Main {

graph
stream<rstring result> NoNewline = com.ibm.streamsx.inet::InetSource() {
param
// the data returned by this URL does not have a newline at the end.
URIList: ["http://services.faa.gov/airport/status/ATL?format=application/xml"];
fetchInterval: 5.0;

}

// make sure we got the last line.
() as checkNoNewline = Custom(NoNewline) {
logic state: {
mutable int32 numEnter = 0;
mutable int32 numExits = 0;
}
onTuple NoNewline: {
if (size(regexMatch(result,"\\s*<AirportStatus>\\s*")) > 0) {
numEnter++;
}
else if (size(regexMatch(result,"\\s*</AirportStatus\\s*"))>0) {
numExits++;
}
if (numEnter-numExits > 1) {
abort();
}
if (numEnter == numExits && numExits > 0) {
shutdownPE();
}
}
}
}
4 changes: 2 additions & 2 deletions tests/tests.http/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#
# *******************************************************************************
# * Copyright (C)2014, International Business Machines Corporation and *
# * Copyright (C)2014-2015, International Business Machines Corporation and *
# * others. All Rights Reserved. *
# *******************************************************************************
#
Expand All @@ -10,7 +10,7 @@ args=-t ../../com.ibm.streamsx.inet
ns=com.ibm.streamsx.inet.http.tests
outputdir=./output

tests=HTTPStreamBasicFunctionTestMain HTTPStreamBadUrlTestMain HTTPStreamConsistentRegionFailMain HTTPPostBasicFunctionTestMain HTTPPostConsistentRegionFailMain
tests=HTTPStreamBasicFunctionTestMain HTTPStreamBadUrlTestMain HTTPStreamConsistentRegionFailMain HTTPPostBasicFunctionTestMain HTTPPostConsistentRegionFailMain URLEncodeTestMain

rntest=./scripts/testRunner.sh
ftest=./scripts/expectFail.sh
Expand Down
33 changes: 33 additions & 0 deletions tests/tests.http/com.ibm.streamsx.inet.http.tests/URLEncode.spl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace com.ibm.streamsx.inet.http.tests;
use com.ibm.streamsx.inet.http::urlEncode;
use com.ibm.streamsx.inet.http::urlDecode;

composite URLEncodeTestMain {

graph
() as tester = Custom() {

logic onProcess: {

rstring raw = "This has spaces and newline \n.";
//println(raw);
rstring encoded = urlEncode(raw);
//println(encoded);
rstring decoded = urlDecode(encoded);
if (decoded != raw) {
abort();
}
rstring raw2 = "This has a null \0 now stuff after the null";
//println(raw2);
rstring encoded2 = urlEncode(raw2);
//println(encoded2);
rstring decoded2 = urlDecode(encoded2);
if (raw2 != decoded2) {
abort();
}

}

}

}

0 comments on commit 0e14ee4

Please sign in to comment.