From 28c753eb8dcaa27b0f6811940725fa450c06f5e0 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Thu, 4 Jan 2024 16:52:14 +0000 Subject: [PATCH] HPCC-30164 Allow persistent connections to be configured per soapcall Signed-off-by: Gavin Halliday --- common/thorhelper/persistent.cpp | 7 ++- common/thorhelper/persistent.hpp | 2 +- common/thorhelper/thorsoapcall.cpp | 68 +++++++++++++++++---- testing/regress/ecl/common/SoapTextTest.ecl | 4 +- 4 files changed, 63 insertions(+), 18 deletions(-) diff --git a/common/thorhelper/persistent.cpp b/common/thorhelper/persistent.cpp index 52133b0d2d0..edc1a4249f7 100644 --- a/common/thorhelper/persistent.cpp +++ b/common/thorhelper/persistent.cpp @@ -205,7 +205,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele { } - virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override + virtual void add(ISocket* sock, SocketEndpoint* ep, PersistentProtocol proto) override { if (!sock || !sock->isValid()) return; @@ -249,7 +249,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele m_selectHandler->remove(sock); } - virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override + virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne, unsigned overrideMaxRequests) override { PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Done using socket %d, keep=%s", sock->OShandle(), boolToStr(keep)); CriticalBlock block(m_critsect); @@ -260,7 +260,8 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele if (info) { info->useCount += usesOverOne; - bool reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount; + unsigned requestLimit = overrideMaxRequests ? overrideMaxRequests : m_maxReqs; + bool reachedQuota = requestLimit > 0 && requestLimit <= info->useCount; if(!sock->isValid()) keep = false; if (keep && !reachedQuota) diff --git a/common/thorhelper/persistent.hpp b/common/thorhelper/persistent.hpp index 3276ac69b8b..7c914135b5a 100644 --- a/common/thorhelper/persistent.hpp +++ b/common/thorhelper/persistent.hpp @@ -39,7 +39,7 @@ interface IPersistentHandler : implements IInterface // Remove a socket from the pool virtual void remove(ISocket* sock) = 0; // Put a socket back to the pool for further reuse, or remove its record from the pool when "keep" is false - virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0) = 0; + virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0, unsigned overrideMaxRequests = 0) = 0; // Get one available socket from the pool virtual ISocket* getAvailable(SocketEndpoint* ep = nullptr, bool* pShouldClose = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) = 0; virtual void stop(bool wait) = 0; diff --git a/common/thorhelper/thorsoapcall.cpp b/common/thorhelper/thorsoapcall.cpp index 814ba8b26a8..16aa3f24f54 100644 --- a/common/thorhelper/thorsoapcall.cpp +++ b/common/thorhelper/thorsoapcall.cpp @@ -556,6 +556,7 @@ class BlackLister : public CInterface, implements IThreadFactory } } *blacklist; +static bool defaultUsePersistConnections = false; static IPersistentHandler* persistentHandler = nullptr; static CriticalSection globalFeatureCrit; static std::atomic globalFeaturesInitDone{false}; @@ -568,11 +569,16 @@ void initGlobalFeatures() CriticalBlock block(globalFeatureCrit); if (!globalFeaturesInitDone) { - int maxPersistentRequests = 0; + int maxPersistentRequests = 100; + defaultUsePersistConnections = false; if (!isContainerized()) + { + defaultUsePersistConnections = queryEnvironmentConf().getPropBool("useHttpCallPersistentRequests", defaultUsePersistConnections); maxPersistentRequests = queryEnvironmentConf().getPropInt("maxHttpCallPersistentRequests", maxPersistentRequests); //global (backward compatible) + } Owned conf = getComponentConfig(); + defaultUsePersistConnections = conf->getPropBool("@useHttpCallPersistentRequests", defaultUsePersistConnections); maxPersistentRequests = conf->getPropInt("@maxHttpCallPersistentRequests", maxPersistentRequests); //component config wins mapUrlsToSecrets = conf->getPropBool("@mapHttpCallUrlsToSecrets", false); warnIfUrlNotMappedToSecret = conf->getPropBool("@warnIfUrlNotMappedToSecret", mapUrlsToSecrets); @@ -580,6 +586,8 @@ void initGlobalFeatures() if (maxPersistentRequests != 0) persistentHandler = createPersistentHandler(nullptr, DEFAULT_MAX_PERSISTENT_IDLE_TIME, maxPersistentRequests, PersistentLogLevel::PLogMin, true); + else + defaultUsePersistConnections = false; globalFeaturesInitDone = true; } @@ -601,6 +609,7 @@ MODULE_EXIT() { persistentHandler->stop(true); ::Release(persistentHandler); + persistentHandler = nullptr; } } @@ -876,7 +885,7 @@ class CWSCHelperThread : public Thread } }; -bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &urlArray, StringBuffer &issuer, StringBuffer &proxyAddress, bool required, WSCType wscType) +bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &urlArray, StringBuffer &issuer, StringBuffer &proxyAddress, bool & persistEnabled, unsigned & persistMaxRequests, bool required, WSCType wscType) { Owned secret; if (!isEmptyString(secretName)) @@ -907,6 +916,15 @@ bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &ur urlListParser.getUrls(urlArray, usernamePasswordPair); getSecretKeyValue(proxyAddress.clear(), secret, "proxy"); getSecretKeyValue(issuer, secret, "issuer"); + + //Options defined in the secret override the defaults and the values specified in ECL + StringBuffer persist; + if (getSecretKeyValue(persist, secret, "persist")) + persistEnabled = strToBool(persist.str()); + + if (getSecretKeyValue(persist.clear(), secret, "persistMaxRequests")) + persistMaxRequests = atoi(persist); + return true; } @@ -934,6 +952,8 @@ class CWSCHelper : implements IWSCHelper, public CInterface bool complete; std::atomic_bool timeLimitExceeded{false}; bool customClientCert = false; + bool persistEnabled = false; + unsigned persistMaxRequests = 0; StringAttr clientCertIssuer; IRoxieAbortMonitor * roxieAbortMonitor; StringBuffer issuer; //TBD sync up with other PR, it will benefit from this being able to come from the secret @@ -988,6 +1008,25 @@ class CWSCHelper : implements IWSCHelper, public CInterface else timeLimitMS = (unsigned)(dval * 1000); + persistEnabled = defaultUsePersistConnections; + persistMaxRequests = 0; // 0 implies do not override the default pool size + if (flags & SOAPFpersist) + { + if (flags & SOAPFpersistMax) + { + unsigned maxRequests = helper->getPersistMaxRequests(); + if (maxRequests != 0) + { + persistEnabled = true; + persistMaxRequests = maxRequests; + } + else + persistEnabled = false; + } + else + persistEnabled = true; + } + if (flags & SOAPFhttpheaders) httpHeaders.set(s.setown(helper->getHttpHeaders())); if (flags & SOAPFxpathhints) @@ -1097,7 +1136,7 @@ class CWSCHelper : implements IWSCHelper, public CInterface } StringBuffer secretName("http-connect-"); secretName.append(finger); - loadConnectSecret(vaultId, secretName, urlArray, issuer, proxyAddress, true, wscType); + loadConnectSecret(vaultId, secretName, urlArray, issuer, proxyAddress, persistEnabled, persistMaxRequests, true, wscType); } else { @@ -1108,11 +1147,11 @@ class CWSCHelper : implements IWSCHelper, public CInterface StringBuffer secretName; UrlArray tempArray; //TBD: If this is a list of URLs do we A. not check for a mapped secret, B. check the first one, C. Use long secret name including entire list - Url &url = urlArray.tos(); + Url &url = urlArray.item(0); url.getDynamicUrlSecretName(secretName); if (secretName.length()) { - if (loadConnectSecret(nullptr, secretName, tempArray, issuer, proxyAddress, requireUrlsMappedToSecrets, wscType)) + if (loadConnectSecret(nullptr, secretName, tempArray, issuer, proxyAddress, persistEnabled, persistMaxRequests, requireUrlsMappedToSecrets, wscType)) { logctx.CTXLOG("Mapped %s URL!", wscCallTypeText()); if (tempArray.length()) @@ -1131,6 +1170,9 @@ class CWSCHelper : implements IWSCHelper, public CInterface if (numUrls == 0) throw MakeStringException(0, "%s specified no URLs", getWsCallTypeName(wscType)); + if (!persistentHandler) + persistEnabled = false; + if (!proxyAddress.isEmpty()) { UrlListParser proxyUrlListParser(proxyAddress); @@ -1316,6 +1358,8 @@ class CWSCHelper : implements IWSCHelper, public CInterface } inline IXmlToRowTransformer * getRowTransformer() { return rowTransformer; } inline const char * wscCallTypeText() const { return getWsCallTypeName(wscType); } + inline bool usePersistConnections() const { return persistEnabled; } + inline unsigned getPersistMaxRequests() const { return persistMaxRequests; } protected: friend class CWSCHelperThread; @@ -2411,7 +2455,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo if (strieq(url.method, "https")) proto = PersistentProtocol::ProtoTLS; bool shouldClose = false; - Owned psock = persistentHandler?persistentHandler->getAvailable(&ep, &shouldClose, proto):nullptr; + Owned psock = master->usePersistConnections() ? persistentHandler->getAvailable(&ep, &shouldClose, proto) : nullptr; if (psock) { isReused = true; @@ -2521,10 +2565,10 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo processResponse(url, response, meta, contentType); delete meta; - if (persistentHandler) + if (master->usePersistConnections()) { if (isReused) - persistentHandler->doneUsing(socket, keepAlive); + persistentHandler->doneUsing(socket, keepAlive, master->getPersistMaxRequests()); else if (keepAlive) persistentHandler->add(socket, &ep, proto); } @@ -2532,7 +2576,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo } catch (IReceivedRoxieException *e) { - if (persistentHandler && isReused) + if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); // server busy ... Sleep and retry if (e->errorCode() == 1001) @@ -2564,7 +2608,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo } catch (IException *e) { - if (persistentHandler && isReused) + if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); if (master->timeLimitExceeded) { @@ -2597,7 +2641,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo } catch (std::exception & es) { - if (persistentHandler && isReused) + if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); if(dynamic_cast(&es)) throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery"); @@ -2605,7 +2649,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo } catch (...) { - if (persistentHandler && isReused) + if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); throw MakeStringException(-1, "Unknown exception in processQuery"); } diff --git a/testing/regress/ecl/common/SoapTextTest.ecl b/testing/regress/ecl/common/SoapTextTest.ecl index afb91f99709..26b1d0fbbf9 100644 --- a/testing/regress/ecl/common/SoapTextTest.ecl +++ b/testing/regress/ecl/common/SoapTextTest.ecl @@ -86,10 +86,10 @@ EXPORT SoapTextTest := MODULE EXPORT doMain(string serviceUrl, string searchWords, unsigned documentLimit) := FUNCTION - soapcallDocumentCount(string searchWord) := SOAPCALL(serviceUrl, 'soaptest_getdocumentcount', countServiceRequest, transform(countServiceRequest, SELF.search := searchWord), countServiceResponse).cnt; + soapcallDocumentCount(string searchWord) := SOAPCALL(serviceUrl, 'soaptest_getdocumentcount', countServiceRequest, transform(countServiceRequest, SELF.search := searchWord), countServiceResponse, PERSIST).cnt; callDocumentCount(string search) := IF((serviceUrl != ''), soapcallDocumentCount(search), doDocumentCount(search)); - soapcallSearchWords(DATASET(wordRec) searchWords) := SOAPCALL(serviceUrl, 'soaptest_getsearchwords', { DATASET(wordRec) search := searchWords }, DATASET(joinServiceResponseRecord)); + soapcallSearchWords(DATASET(wordRec) searchWords) := SOAPCALL(serviceUrl, 'soaptest_getsearchwords', { DATASET(wordRec) search := searchWords }, DATASET(joinServiceResponseRecord), PERSIST); callSearchWords(DATASET(wordRec) searchWords) := IF((serviceUrl != ''), soapcallSearchWords(searchWords), doSearchWords(searchWords)); splitWords := Std.Str.SplitWords(searchWords, ',', false);