diff --git a/.gitignore b/.gitignore index d6dc5c9..d063cc6 100644 --- a/.gitignore +++ b/.gitignore @@ -9,8 +9,10 @@ impl/nl/include/*.h /com.ibm.streamsx.inet/impl/nl/InetResource.dat # generated operator models -/com.ibm.streamsx.inet/com.ibm.streamsx.inet.wsserver/WebSocketInject/WebSocketInject.xml -/com.ibm.streamsx.inet/com.ibm.streamsx.inet.wsserver/WebSocketSend/WebSocketSend.xm +/com.ibm.streamsx.inet/com.ibm.streamsx.inet.wsserver/WebSocketInject/* +/com.ibm.streamsx.inet/com.ibm.streamsx.inet.wsserver/WebSocketSend/* +/com.ibm.streamsx.inet/com.ibm.streamsx.inet.rest/*/*.gif +/com.ibm.streamsx.inet/com.ibm.streamsx.inet.http/*/*.gif samples/*/output samples/*/toolkit.xml diff --git a/com.ibm.streamsx.inet/build.xml b/com.ibm.streamsx.inet/build.xml index ecec1fd..ea8ecf4 100644 --- a/com.ibm.streamsx.inet/build.xml +++ b/com.ibm.streamsx.inet/build.xml @@ -92,7 +92,7 @@ artifacts were left around and caused issues with the ant build. - + @@ -100,7 +100,7 @@ artifacts were left around and caused issues with the ant build. - + diff --git a/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPCommand/FTPCommand.xml b/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPCommand/FTPCommand.xml index 2a11a28..075dccd 100644 --- a/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPCommand/FTPCommand.xml +++ b/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPCommand/FTPCommand.xml @@ -78,7 +78,7 @@ The command string and the command arguments are received from port 0. FTP wrapper lib - ftpwrapper + inettoolkit ../../impl/lib ../../impl/cpp/include/libftp diff --git a/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPPutFile/FTPPutFile.xml b/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPPutFile/FTPPutFile.xml index dc40071..47db7c7 100644 --- a/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPPutFile/FTPPutFile.xml +++ b/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPPutFile/FTPPutFile.xml @@ -104,7 +104,7 @@ You can optionally rename the file after you complete the transfer. FTP wrapper lib - ftpwrapper + inettoolkit ../../impl/lib ../../impl/cpp/include/libftp diff --git a/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPReader/FTPReader.xml b/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPReader/FTPReader.xml index 31ff214..bef3512 100644 --- a/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPReader/FTPReader.xml +++ b/com.ibm.streamsx.inet/com.ibm.streamsx.inet.ftp/FTPReader/FTPReader.xml @@ -127,7 +127,7 @@ character. An complete empty line indicates an empty file. This function must no FTP wrapper lib - ftpwrapper + inettoolkit ../../impl/lib ../../impl/cpp/include/libftp diff --git a/com.ibm.streamsx.inet/com.ibm.streamsx.inet.http/native.function/function.xml b/com.ibm.streamsx.inet/com.ibm.streamsx.inet.http/native.function/function.xml new file mode 100644 index 0000000..e3be3b3 --- /dev/null +++ b/com.ibm.streamsx.inet/com.ibm.streamsx.inet.http/native.function/function.xml @@ -0,0 +1,44 @@ + + + + httpFunctions.h + com_ibm_streamsx_inet_http + + + HTTP GET on the given url, using the username and password, if present. It follows redirects. If there is an error, a non-zero is returned by error. + public rstring httpGet(rstring url,list<rstring> extraHeaders, rstring user, rstring password, mutable int32 error) + + + HTTP DELETE on the given url, using the username and password, if present. It follows redirects. If there is an error, a non-zero is returned by error. + public rstring httpDelete(rstring url,list<rstring> extraHeaders, rstring user, rstring password, mutable int32 error) + + + HTTP PUT data to the given url using the username and password given if not empty. Does not follow redirects. Headers are returned in the headers list, error is set to a non-zero if there is an error, and the result of the PUT is returned in the rstring. + public rstring httpPut(rstring data, rstring url, list<rstring> extraHeaders, rstring username, rstring password, mutable list<rstring> headers, mutable int32 error) + + + HTTP POST data to the given url using the username and password (if non-empty). Does not follow redirects. Headers are returned in the headers list. Error is set to non-zero if there is an error. The result of the POST is returned as an rstring. + public rstring httpPost(rstring data, rstring url, list<rstring> extraHeaders, rstring username, rstring password, mutable list<rstring> headers, mutable int32 error) + + + Decode a URL encoded rstring. + public rstring urlDecode(rstring decode) + + + URL encode the given rstring. + public rstring urlEncode(rstring raw) + + + + + + + curl + inettoolkit + ../../impl/lib + ../../impl/cpp/include + + + + + diff --git a/com.ibm.streamsx.inet/impl/Makefile.libftp b/com.ibm.streamsx.inet/impl/Makefile similarity index 94% rename from com.ibm.streamsx.inet/impl/Makefile.libftp rename to com.ibm.streamsx.inet/impl/Makefile index bb8862e..ceff580 100644 --- a/com.ibm.streamsx.inet/impl/Makefile.libftp +++ b/com.ibm.streamsx.inet/impl/Makefile @@ -19,10 +19,10 @@ SPL_LINK_OPTIONS = $(shell $(SPL_PKGCFG) --libs $(SPL_PKG)) # Folders # ----------------------------------------------------------------------------- TRG := . -INC:= $(TRG)/cpp/include/libftp +INC:= $(TRG)/cpp/include LIB := $(TRG)/lib -OBJ := $(TRG)/cpp/obj/libftp -SRC := $(TRG)/cpp/src/libftp +OBJ := $(TRG)/cpp/bin +SRC := $(TRG)/cpp/src # ----------------------------------------------------------------------------- # phony targets are targets being not in the filesystem but are always "active" @@ -34,7 +34,7 @@ SRC := $(TRG)/cpp/src/libftp # Library # ----------------------------------------------------------------------------- -LIBRARY := $(LIB)/libftpwrapper.so +LIBRARY := $(LIB)/libinettoolkit.so # ----------------------------------------------------------------------------- # Helpers @@ -55,7 +55,7 @@ $(LIB) $(OBJ): $(OBJ)/%.o: $(SRC)/%.cpp $(INCLUDES) @echo Compiling '$<' ... - $(CXX) $(CXXFLAGS) -c $< -o $@ + $(CXX) $(CXXFLAGS) -c $< -o $@ -I $(INC) $(LIBRARY): $(LIB) $(OBJ) $(OBJECTS) @echo Building C++ shared library '$@' @@ -78,4 +78,4 @@ help: @echo "all: clean build this is the default target" @echo "clean: remove library and object files" @echo "build: build the library" - \ No newline at end of file + diff --git a/com.ibm.streamsx.inet/impl/cpp/include/libftp/FTPWrapper.h b/com.ibm.streamsx.inet/impl/cpp/include/FTPWrapper.h similarity index 100% rename from com.ibm.streamsx.inet/impl/cpp/include/libftp/FTPWrapper.h rename to com.ibm.streamsx.inet/impl/cpp/include/FTPWrapper.h diff --git a/com.ibm.streamsx.inet/impl/cpp/include/httpFunctions.h b/com.ibm.streamsx.inet/impl/cpp/include/httpFunctions.h new file mode 100644 index 0000000..53718fa --- /dev/null +++ b/com.ibm.streamsx.inet/impl/cpp/include/httpFunctions.h @@ -0,0 +1,28 @@ +#ifndef HTTP_FOR_STREAMS +#define HTTP_FOR_STREAMS +#include "curl/curl.h" +#include +#include + +namespace com_ibm_streamsx_inet_http { + + +// We're just writing bytes. +size_t populate_rstring(char *ptr,size_t size, size_t nmemb, void*userdata); + +SPL::rstring httpGet(const SPL::rstring & url, const SPL::list & extraHeaders, const SPL::rstring & username, const SPL::rstring & password,SPL::int32 & error); + +SPL::rstring httpDelete(const SPL::rstring & url, const SPL::list & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::int32 & error); + +SPL::rstring httpPut(const SPL::rstring & data, const SPL::rstring & url, const SPL::list & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::list& headers, SPL::int32 & error); + +SPL::rstring httpPost(const SPL::rstring & data, const SPL::rstring & url, const SPL::list & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::list& headers, SPL::int32 & error); + + +SPL::rstring urlEncode(const SPL::rstring & raw); + +SPL::rstring urlDecode(const SPL::rstring & encoded); + + +} +#endif diff --git a/com.ibm.streamsx.inet/impl/cpp/src/libftp/FTPWrapper.cpp b/com.ibm.streamsx.inet/impl/cpp/src/FTPWrapper.cpp similarity index 99% rename from com.ibm.streamsx.inet/impl/cpp/src/libftp/FTPWrapper.cpp rename to com.ibm.streamsx.inet/impl/cpp/src/FTPWrapper.cpp index 0208816..1c2ff8e 100644 --- a/com.ibm.streamsx.inet/impl/cpp/src/libftp/FTPWrapper.cpp +++ b/com.ibm.streamsx.inet/impl/cpp/src/FTPWrapper.cpp @@ -1,7 +1,7 @@ /* Copyright (C) 2013-2014, International Business Machines Corporation */ /* All Rights Reserved */ -#include "../../include/libftp/FTPWrapper.h" +#include "FTPWrapper.h" #include #include #include diff --git a/com.ibm.streamsx.inet/impl/cpp/src/httpFunctions.cpp b/com.ibm.streamsx.inet/impl/cpp/src/httpFunctions.cpp new file mode 100644 index 0000000..b814957 --- /dev/null +++ b/com.ibm.streamsx.inet/impl/cpp/src/httpFunctions.cpp @@ -0,0 +1,453 @@ +#include "httpFunctions.h" +#include "SPL/Runtime/Common/RuntimeDebug.h" +#include +#include +#include +namespace com_ibm_streamsx_inet_http { + + +typedef std::tr1::unordered_map,struct curl_slist*> HeaderCache; + +std::vector* activeCurlPointers = NULL; +const char* logTag = "httpForStreams"; + +static __thread HeaderCache* headerCache = NULL; + +__attribute__((constructor)) void initializeCurl() { + curl_global_init(CURL_GLOBAL_ALL); +} + +__attribute__((destructor)) void cleanupCurl() { + + + while (activeCurlPointers != NULL && activeCurlPointers->size() > 0 ){ + curl_easy_cleanup(activeCurlPointers->back()); + activeCurlPointers->pop_back(); + } + if (headerCache != NULL) { + for (HeaderCache::const_iterator del = headerCache->begin(); del != headerCache->end(); del++) { + curl_slist_free_all(del->second); + } + headerCache->clear(); + delete headerCache; + } + curl_global_cleanup(); +} + +// We save the set of active curl handles so we can clean them up on shutdown. +void addCurlHandle(CURL * handle) { + + if (activeCurlPointers == NULL) { + activeCurlPointers = new std::vector(6); + } + activeCurlPointers->push_back(handle); + +} + + +// I'm not sure caching these is actually faster, but it does mean +// that hte calling functions don't have to worry about freeing the memory. + +struct curl_slist * getSList(SPL::list theList) { + if (headerCache == NULL) { + headerCache = new HeaderCache(); + } + HeaderCache::iterator it = headerCache->find(theList); + if (it == headerCache->end()) { + SPLAPPTRC(L_DEBUG,"Adding to cache",logTag); + // Not cached, so build a new slist. + struct curl_slist * toReturn = NULL; + for ( SPL::list::const_iterator it = theList.begin(); it != theList.end(); it++) { + toReturn = curl_slist_append(toReturn,it->c_str()); + } + // Lots of saved stuff; lets clear out the cache. + if (headerCache->size() > 20) { + for (HeaderCache::const_iterator del = headerCache->begin(); del != headerCache->end(); del++) { + curl_slist_free_all(del->second); + } + headerCache->clear(); + } + std::pair res = headerCache->insert(std::make_pair,struct curl_slist *>(theList,toReturn)); + if (!res.second) { + SPLAPPTRC(L_ERROR,"Internal error",logTag); + } + return toReturn; + } + else { + SPLAPPTRC(L_DEBUG,"Using cached value for header list",logTag); + return it->second; + } +} + +const char * nameForPost="fromTuple"; +// create the blob with the bytes from the file. +size_t populate_blob(char *ptr,size_t size, size_t nmemb, void*userdata) { + SPLAPPTRC(L_DEBUG,"Populate blob called with " << size << " and nmemb " << nmemb,logTag); + SPL::blob * toReturn = (SPL::blob*)userdata; + if (toReturn->getSize() == 0) { + toReturn->setData((const unsigned char*)ptr,size*nmemb); + } + else { + toReturn->append((const unsigned char*)ptr,size*nmemb); + } + return size*nmemb; +} + +size_t populate_rstring(char *ptr,size_t size, size_t nmemb, void*userdata) { + SPLAPPTRC(L_DEBUG,"Populate blob called with " << size << " and nmemb " << nmemb,logTag); + SPL::rstring * toReturn = (SPL::rstring*)userdata; + if (toReturn->size() == 0) { + toReturn->assign((const char*)ptr,size*nmemb); + } + else { + toReturn->append((const char*)ptr,size*nmemb); + } + return size*nmemb; +} + +// Just put the header as-is into the list. +size_t addToHeaderList(char * buffer, size_t size, size_t nitems, void * userdata) { + SPL::list * theList = (SPL::list*)userdata; + SPL::rstring newString(buffer,size*nitems); + theList->push_back(newString); + return size*nitems; +} + +CURLcode addCommonOpts(CURL * curl, const SPL::rstring & url, const SPL::list & extraHeaders, const SPL::rstring & username, const SPL::rstring & password) { + + CURLcode res = curl_easy_setopt(curl,CURLOPT_URL,url.c_str()); + + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error " << res << " setting URL", logTag); + return res; + } + + if (extraHeaders.size() > 0) { + struct curl_slist * extraHeadersSlist = getSList(extraHeaders); + res = curl_easy_setopt(curl,CURLOPT_HTTPHEADER,extraHeadersSlist); + if (res!= CURLE_OK) { + return res; + } + } + else { + res = curl_easy_setopt(curl,CURLOPT_HTTPHEADER,NULL); + } + if (res != CURLE_OK) { + return res; + } + + res=curl_easy_setopt(curl,CURLOPT_SSL_VERIFYPEER,0); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error " << res << " setting no-verify", logTag); + return res; + } + + if (username.length() > 0 ) { + CURLcode res=curl_easy_setopt(curl,CURLOPT_USERNAME,(char*)(username.c_str())); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error " << res << " setting username", logTag); + return res; + } + } + if (password.length() > 0) { + CURLcode res = curl_easy_setopt(curl,CURLOPT_PASSWORD,(char*)(password.c_str())); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error " << res << " setting password", logTag); + return res; + } + } + return CURLE_OK; +} + +SPL::rstring urlEncode(const SPL::rstring & raw) { +static __thread CURL* encode = NULL; +if (encode == NULL) { + encode = curl_easy_init(); + addCurlHandle(encode); + } +char * result = curl_easy_escape(encode,raw.data(),raw.size()); +SPL::rstring toReturn(result); +curl_free(result); +return toReturn; +} + + +SPL::rstring urlDecode(const SPL::rstring & encoded) { +static __thread CURL* decode = NULL; +if (decode == NULL) { + decode = curl_easy_init(); + addCurlHandle(decode); +} +int length = 0; +char * result = curl_easy_unescape(decode,encoded.data(), encoded.size(),&length); +SPL::rstring toReturn(result,length); +curl_free(result); +return toReturn; +} + +CURLcode readResultAsRstring(CURL* curl , SPL::rstring * resultPtr) { + // Now handle read, for error checking and exception handling + SPL::rstring toReturn; + CURLcode res = curl_easy_setopt(curl,CURLOPT_WRITEDATA,resultPtr); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error " << res << "setting data pointer", logTag); + return res; + } + res = curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION,&(populate_rstring)); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error " << res << " setting write function", logTag); + } + return res; +} + +class RstringAndIndex { + public: + const SPL::rstring * theData; + size_t numSent; +}; + +size_t readFromRstring(char * buffer, size_t size, size_t nitems, void *instream) { + SPLAPPTRC(L_DEBUG,"readFromRstring size " << size << " nitems " << nitems, logTag); + RstringAndIndex* theStruct = (RstringAndIndex*)instream; + const SPL::rstring * toSend = theStruct->theData; + + uint32_t i = 0; + // probably can use mem copy. + for (; theStruct->numSent < toSend->length() && i < size*nitems; i++,theStruct->numSent++) { + buffer[i] = *(toSend->data() + theStruct->numSent); + } + if (size*nitems < toSend->length()) { + SPLAPPTRC(L_ERROR,"Size is " << size*nitems << " blob size is " << toSend->length(), logTag); + } + SPLAPPTRC(L_DEBUG,"sent " << i << " bytes, numSent is " << theStruct->numSent,logTag); + return i; +} + +SPL::rstring httpDelete(const SPL::rstring & url, const SPL::list &extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::int32 & error) { + +static __thread CURL* curlDelete = NULL; +error = 0; +if (curlDelete == NULL) { + curlDelete = curl_easy_init(); + addCurlHandle(curlDelete); +} + +CURLcode res=addCommonOpts(curlDelete, url, extraHeaders, username, password); +if (res != CURLE_OK) { + error = res; + return ""; +} +res = curl_easy_setopt(curlDelete,CURLOPT_CUSTOMREQUEST,"DELETE"); +if (res != CURLE_OK) { + error = res; + return ""; +} +SPL::rstring toReturn; +res = readResultAsRstring(curlDelete,&toReturn); +if (res != CURLE_OK) { + error = res; + return ""; +} +res = curl_easy_perform(curlDelete); +if (res != CURLE_OK) { + error = res; + return ""; +} +return toReturn; +} + +SPL::rstring httpPost(const SPL::rstring & data, const SPL::rstring & url, const SPL::list & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::list & headers,SPL::int32 & error) { + static __thread CURL* curlPost =NULL; + error = 0; + // curlPost is defined as a thread-local static variable. + + if (curlPost == NULL) { + curlPost = curl_easy_init(); + addCurlHandle(curlPost); + } + + headers.clear(); + CURLcode res = addCommonOpts(curlPost,url,extraHeaders,username, password); + if (res != CURLE_OK) { + error = res; + return ""; + } + res = curl_easy_setopt(curlPost,CURLOPT_HEADERDATA,(void*)&headers); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR,"Error code " << res << " setting header data",logTag); + error = res; + return ""; + } + res = curl_easy_setopt(curlPost,CURLOPT_HEADERFUNCTION,&addToHeaderList); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error code " << res << " setting header function", logTag); + error = res; + return ""; + } + + res = curl_easy_setopt(curlPost,CURLOPT_POSTFIELDS,data.data()); + if (res != CURLE_OK) { + error = res; + return ""; + } + curl_easy_setopt(curlPost,CURLOPT_POSTFIELDSIZE,data.length()); + if (res!= CURLE_OK) { + error = res; + return ""; + } + + + + // Now handle read, for error checking and exception handling + SPL::rstring toReturn; + res = readResultAsRstring(curlPost,&toReturn); + if (res != CURLE_OK) { + error = res; + return ""; + } + res = curl_easy_setopt(curlPost,CURLOPT_FOLLOWLOCATION,0); + if (res != CURLE_OK) { + error = res; + return ""; + } + + SPLAPPTRC(L_DEBUG,"About to perform",logTag); + // ALL DONE! Do action. + res = curl_easy_perform(curlPost); + + + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error " << res << " on curl_easy_perform", logTag); + error =res; + return ""; + } + error = 0; // All went well! + return toReturn; + +} + + + +SPL::rstring httpPut(const SPL::rstring & data, const SPL::rstring & url, const SPL::list & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::list & headers, SPL::int32 & error) { + static __thread CURL* curlPut = NULL; + // curlPut is a thread-local static variable. + + if (curlPut == NULL) { + curlPut = curl_easy_init(); + addCurlHandle(curlPut); + } + headers.clear(); + addCommonOpts(curlPut,url,extraHeaders,username, password); + CURLcode res = curl_easy_setopt(curlPut,CURLOPT_HEADERDATA,(void*)&headers); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR,"Error code " << res << " setting header data",logTag); + error = res; + return ""; + } + res = curl_easy_setopt(curlPut,CURLOPT_HEADERFUNCTION,&addToHeaderList); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error code " << res << " setting header function", logTag); + error = res; + return ""; + } + //curl_easy_setopt(curlPut,CURLOPT_VERBOSE,1); + // Set post vs put. + res = curl_easy_setopt(curlPut,CURLOPT_UPLOAD,1); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error code " << res << " setting option CURLOPT_UPLOAD" ,logTag); + error = res; + return ""; + } + + RstringAndIndex readHelper; + if (data.length() > 0 ) { + readHelper.theData = &data; + readHelper.numSent = 0; + res = curl_easy_setopt(curlPut,CURLOPT_READFUNCTION,&readFromRstring); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error code " << res << "setting read function", logTag); + error = res; + return ""; + } + + res = curl_easy_setopt(curlPut,CURLOPT_READDATA,&readHelper); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR,"Error code " << res << " setting read data", logTag); + error = res; + return ""; + } + } + res= curl_easy_setopt(curlPut,CURLOPT_FOLLOWLOCATION,0); + if (res != CURLE_OK) { + error = res; + return ""; + } + SPLAPPTRC(L_TRACE,"About to perform",logTag); + // Now handle read, for error checking and exception handling + SPL::rstring toReturn; + res = readResultAsRstring(curlPut,&toReturn); + if (res != CURLE_OK) { + error = res; + return ""; + } + + // ALL DONE! Do action. + res = curl_easy_perform(curlPut); + SPLAPPTRC(L_TRACE,"About to perform",logTag); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error " << res << " on curl_easy_perform", logTag); + error = res; + return ""; + } + return toReturn; +} + + +// First pass as curl_get. +SPL::rstring httpGet(const SPL::rstring & url, const SPL::list & extraHeaders, const SPL::rstring & username, const SPL::rstring & password, SPL::int32& error) { + + // Curl is more efficient if we let it reuse handles + static __thread CURL* curlGet = NULL; + if (curlGet == NULL) { + curlGet = curl_easy_init(); + addCurlHandle(curlGet); + } + SPL::rstring toReturn; + CURLcode res; + // Let's make sure someone didn't forget to initialize before calling + error = 0; + if (curlGet) { + res = addCommonOpts(curlGet,url,extraHeaders,username, password); + if (res != 0 ) { + error = res; + return toReturn; + } + res = readResultAsRstring(curlGet,&toReturn); + + if (res != CURLE_OK) { + error = res; + return toReturn; + } + res = curl_easy_setopt(curlGet,CURLOPT_FOLLOWLOCATION,1); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR,"Error " << res << " setting follow location",logTag); + error = res; + return toReturn; + } + res = curl_easy_perform(curlGet); + if (res != CURLE_OK) { + SPLAPPTRC(L_ERROR, "Error " << res << " on curl_easy_perform", logTag); + error = res; + return toReturn; + } + long responseCode = 0; + curl_easy_getinfo(curlGet,CURLINFO_RESPONSE_CODE,&responseCode); + if (responseCode != 200) { + SPLAPPTRC(L_ERROR,"Unexpected response code " << responseCode,logTag); + error = -1; + } + } + return toReturn; +} + +} diff --git a/com.ibm.streamsx.inet/info.xml b/com.ibm.streamsx.inet/info.xml index 21861b0..ff5e025 100644 --- a/com.ibm.streamsx.inet/info.xml +++ b/com.ibm.streamsx.inet/info.xml @@ -4,7 +4,7 @@ com.ibm.streamsx.inet - + The Internet Toolkit provides support for common internet protocols. This toolkit separates its functionality into a number of namespaces: @@ -48,7 +48,7 @@ Alternatively, you can fully qualify the operators that are provided by toolkit 4. Start the InfoSphere Streams instance. 5. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio. - 2.6.1 + 2.7.0 4.0.0.0 diff --git a/samples/HTTPFunctions/Makefile b/samples/HTTPFunctions/Makefile new file mode 100644 index 0000000..d020c89 --- /dev/null +++ b/samples/HTTPFunctions/Makefile @@ -0,0 +1,21 @@ +# Copyright (C) 2011,2014, International Business Machines Corporation. +# All Rights Reserved. + +.PHONY: all standalone distributed clean + +STREAMS_INET_TOOLKIT ?=../../com.ibm.streamsx.inet:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.inet +SPLC_FLAGS ?= -a -t $(STREAMS_INET_TOOLKIT) +SPLC = $(STREAMS_INSTALL)/bin/sc +SPL_CMD_ARGS ?= +SPL_MAIN_COMPOSITE = com.ibm.streamsx.inet.sample::PostGet + +all: standalone + +standalone: + $(SPLC) $(SPLC_FLAGS) -T -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS) + +distributed: + $(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS) + +clean: + $(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE) diff --git a/samples/HTTPFunctions/com.ibm.streamsx.inet.sample/GetPost.spl b/samples/HTTPFunctions/com.ibm.streamsx.inet.sample/GetPost.spl new file mode 100644 index 0000000..1349cf0 --- /dev/null +++ b/samples/HTTPFunctions/com.ibm.streamsx.inet.sample/GetPost.spl @@ -0,0 +1,89 @@ +namespace com.ibm.streamsx.inet.sample; +use com.ibm.streamsx.inet.http::httpGet; +use com.ibm.streamsx.inet.http::httpPost; +use com.ibm.streamsx.inet.rest::HTTPTupleInjection; +composite GetPost { +graph +stream GetStream = Custom() { + +logic onProcess: { + +mutable int32 error = 0; +rstring result1 = httpGet("http://weather.noaa.gov/pub/data/observations/metar/cycles/00Z.TXT", + (list)[],"","",error); +if (error == 0) { + submit({result=result1},GetStream); +} +else { + appTrc(Trace.error,"Error code was "+(rstring)error); +} +} + +} + +() as checkGet = Custom(GetStream) { + + logic state: { + mutable int32 numTuples = 0; + } + + onTuple GetStream: + { + if (result != "") { + numTuples++; + } + + } + onPunct GetStream: + { + if (currentPunct() == Sys.FinalMarker) { + if (numTuples != 1) { + abort(); + } + } + } +} + +() as sentTuples = Custom() { +logic onProcess: { + block(5.0); + mutable int32 error =0; + mutable int32 i = 0; + while (i < 5) { + rstring toPost = "str=This is tuple%20"+(rstring)i+"&iter="+(rstring)i; + appTrc(Trace.info,"To post: "+toPost); + mutable list replyHeaders = []; + httpPost(toPost,"http://127.0.0.1:8080/Received/ports/output/0/inject",["ContentType: application/x-www-form-urlencoded"],"","",replyHeaders,error); + if (error != 0) { + appTrc(Trace.error,"Error code on post was "+(rstring)error); + } + i++; + } + } // end onProcess +}// end custom + +stream Received = HTTPTupleInjection () { + +param +port: 8080; + +} + +() as postchecker = Custom(Received) { + + logic state: { + mutable int32 numTuples = 0; + } + onTuple Received: { + if (iter != numTuples) { + abort(); + } + numTuples++; + if (numTuples == 5) { + appTrc(Trace.error, "Application successful, exiting"); + shutdownPE(); + } + } +} + +} diff --git a/tests/tests.http/Makefile b/tests/tests.http/Makefile index 562790e..ed23300 100644 --- a/tests/tests.http/Makefile +++ b/tests/tests.http/Makefile @@ -1,6 +1,6 @@ # # ******************************************************************************* -# * Copyright (C)2014, International Business Machines Corporation and * +# * Copyright (C)2014-2015, International Business Machines Corporation and * # * others. All Rights Reserved. * # ******************************************************************************* # @@ -10,7 +10,7 @@ args=-t ../../com.ibm.streamsx.inet ns=com.ibm.streamsx.inet.http.tests outputdir=./output -tests=HTTPStreamBasicFunctionTestMain HTTPStreamBadUrlTestMain HTTPStreamConsistentRegionFailMain HTTPPostBasicFunctionTestMain HTTPPostConsistentRegionFailMain +tests=HTTPStreamBasicFunctionTestMain HTTPStreamBadUrlTestMain HTTPStreamConsistentRegionFailMain HTTPPostBasicFunctionTestMain HTTPPostConsistentRegionFailMain URLEncodeTestMain rntest=./scripts/testRunner.sh ftest=./scripts/expectFail.sh diff --git a/tests/tests.http/com.ibm.streamsx.inet.http.tests/URLEncode.spl b/tests/tests.http/com.ibm.streamsx.inet.http.tests/URLEncode.spl new file mode 100644 index 0000000..9ebd42c --- /dev/null +++ b/tests/tests.http/com.ibm.streamsx.inet.http.tests/URLEncode.spl @@ -0,0 +1,33 @@ +namespace com.ibm.streamsx.inet.http.tests; +use com.ibm.streamsx.inet.http::urlEncode; +use com.ibm.streamsx.inet.http::urlDecode; + +composite URLEncodeTestMain { + +graph +() as tester = Custom() { + +logic onProcess: { + + rstring raw = "This has spaces and newline \n."; + //println(raw); + rstring encoded = urlEncode(raw); + //println(encoded); + rstring decoded = urlDecode(encoded); + if (decoded != raw) { + abort(); + } + rstring raw2 = "This has a null \0 now stuff after the null"; + //println(raw2); + rstring encoded2 = urlEncode(raw2); + //println(encoded2); + rstring decoded2 = urlDecode(encoded2); + if (raw2 != decoded2) { + abort(); + } + +} + +} + +}