Skip to content

Commit

Permalink
HPCC-30164 Allow persistent connections to be configured per soapcall
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Jan 5, 2024
1 parent 208ed71 commit 28c753e
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 18 deletions.
7 changes: 4 additions & 3 deletions common/thorhelper/persistent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
{
}

virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override
virtual void add(ISocket* sock, SocketEndpoint* ep, PersistentProtocol proto) override
{
if (!sock || !sock->isValid())
return;
Expand Down Expand Up @@ -249,7 +249,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
m_selectHandler->remove(sock);
}

virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne, unsigned overrideMaxRequests) override
{
PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Done using socket %d, keep=%s", sock->OShandle(), boolToStr(keep));
CriticalBlock block(m_critsect);
Expand All @@ -260,7 +260,8 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
if (info)
{
info->useCount += usesOverOne;
bool reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount;
unsigned requestLimit = overrideMaxRequests ? overrideMaxRequests : m_maxReqs;
bool reachedQuota = requestLimit > 0 && requestLimit <= info->useCount;
if(!sock->isValid())
keep = false;
if (keep && !reachedQuota)
Expand Down
2 changes: 1 addition & 1 deletion common/thorhelper/persistent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface IPersistentHandler : implements IInterface
// Remove a socket from the pool
virtual void remove(ISocket* sock) = 0;
// Put a socket back to the pool for further reuse, or remove its record from the pool when "keep" is false
virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0) = 0;
virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0, unsigned overrideMaxRequests = 0) = 0;
// Get one available socket from the pool
virtual ISocket* getAvailable(SocketEndpoint* ep = nullptr, bool* pShouldClose = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) = 0;
virtual void stop(bool wait) = 0;
Expand Down
68 changes: 56 additions & 12 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ class BlackLister : public CInterface, implements IThreadFactory
}
} *blacklist;

static bool defaultUsePersistConnections = false;
static IPersistentHandler* persistentHandler = nullptr;
static CriticalSection globalFeatureCrit;
static std::atomic<bool> globalFeaturesInitDone{false};
Expand All @@ -568,18 +569,25 @@ void initGlobalFeatures()
CriticalBlock block(globalFeatureCrit);
if (!globalFeaturesInitDone)
{
int maxPersistentRequests = 0;
int maxPersistentRequests = 100;
defaultUsePersistConnections = false;
if (!isContainerized())
{
defaultUsePersistConnections = queryEnvironmentConf().getPropBool("useHttpCallPersistentRequests", defaultUsePersistConnections);
maxPersistentRequests = queryEnvironmentConf().getPropInt("maxHttpCallPersistentRequests", maxPersistentRequests); //global (backward compatible)
}

Owned<IPropertyTree> conf = getComponentConfig();
defaultUsePersistConnections = conf->getPropBool("@useHttpCallPersistentRequests", defaultUsePersistConnections);
maxPersistentRequests = conf->getPropInt("@maxHttpCallPersistentRequests", maxPersistentRequests); //component config wins
mapUrlsToSecrets = conf->getPropBool("@mapHttpCallUrlsToSecrets", false);
warnIfUrlNotMappedToSecret = conf->getPropBool("@warnIfUrlNotMappedToSecret", mapUrlsToSecrets);
requireUrlsMappedToSecrets = conf->getPropBool("@requireUrlsMappedToSecrets", false);

if (maxPersistentRequests != 0)
persistentHandler = createPersistentHandler(nullptr, DEFAULT_MAX_PERSISTENT_IDLE_TIME, maxPersistentRequests, PersistentLogLevel::PLogMin, true);
else
defaultUsePersistConnections = false;

globalFeaturesInitDone = true;
}
Expand All @@ -601,6 +609,7 @@ MODULE_EXIT()
{
persistentHandler->stop(true);
::Release(persistentHandler);
persistentHandler = nullptr;
}
}

Expand Down Expand Up @@ -876,7 +885,7 @@ class CWSCHelperThread : public Thread
}
};

bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &urlArray, StringBuffer &issuer, StringBuffer &proxyAddress, bool required, WSCType wscType)
bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &urlArray, StringBuffer &issuer, StringBuffer &proxyAddress, bool & persistEnabled, unsigned & persistMaxRequests, bool required, WSCType wscType)
{
Owned<const IPropertyTree> secret;
if (!isEmptyString(secretName))
Expand Down Expand Up @@ -907,6 +916,15 @@ bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &ur
urlListParser.getUrls(urlArray, usernamePasswordPair);
getSecretKeyValue(proxyAddress.clear(), secret, "proxy");
getSecretKeyValue(issuer, secret, "issuer");

//Options defined in the secret override the defaults and the values specified in ECL
StringBuffer persist;
if (getSecretKeyValue(persist, secret, "persist"))
persistEnabled = strToBool(persist.str());

if (getSecretKeyValue(persist.clear(), secret, "persistMaxRequests"))
persistMaxRequests = atoi(persist);

return true;
}

Expand Down Expand Up @@ -934,6 +952,8 @@ class CWSCHelper : implements IWSCHelper, public CInterface
bool complete;
std::atomic_bool timeLimitExceeded{false};
bool customClientCert = false;
bool persistEnabled = false;
unsigned persistMaxRequests = 0;
StringAttr clientCertIssuer;
IRoxieAbortMonitor * roxieAbortMonitor;
StringBuffer issuer; //TBD sync up with other PR, it will benefit from this being able to come from the secret
Expand Down Expand Up @@ -988,6 +1008,25 @@ class CWSCHelper : implements IWSCHelper, public CInterface
else
timeLimitMS = (unsigned)(dval * 1000);

persistEnabled = defaultUsePersistConnections;
persistMaxRequests = 0; // 0 implies do not override the default pool size
if (flags & SOAPFpersist)
{
if (flags & SOAPFpersistMax)
{
unsigned maxRequests = helper->getPersistMaxRequests();
if (maxRequests != 0)
{
persistEnabled = true;
persistMaxRequests = maxRequests;
}
else
persistEnabled = false;
}
else
persistEnabled = true;
}

if (flags & SOAPFhttpheaders)
httpHeaders.set(s.setown(helper->getHttpHeaders()));
if (flags & SOAPFxpathhints)
Expand Down Expand Up @@ -1097,7 +1136,7 @@ class CWSCHelper : implements IWSCHelper, public CInterface
}
StringBuffer secretName("http-connect-");
secretName.append(finger);
loadConnectSecret(vaultId, secretName, urlArray, issuer, proxyAddress, true, wscType);
loadConnectSecret(vaultId, secretName, urlArray, issuer, proxyAddress, persistEnabled, persistMaxRequests, true, wscType);
}
else
{
Expand All @@ -1108,11 +1147,11 @@ class CWSCHelper : implements IWSCHelper, public CInterface
StringBuffer secretName;
UrlArray tempArray;
//TBD: If this is a list of URLs do we A. not check for a mapped secret, B. check the first one, C. Use long secret name including entire list
Url &url = urlArray.tos();
Url &url = urlArray.item(0);
url.getDynamicUrlSecretName(secretName);
if (secretName.length())
{
if (loadConnectSecret(nullptr, secretName, tempArray, issuer, proxyAddress, requireUrlsMappedToSecrets, wscType))
if (loadConnectSecret(nullptr, secretName, tempArray, issuer, proxyAddress, persistEnabled, persistMaxRequests, requireUrlsMappedToSecrets, wscType))
{
logctx.CTXLOG("Mapped %s URL!", wscCallTypeText());
if (tempArray.length())
Expand All @@ -1131,6 +1170,9 @@ class CWSCHelper : implements IWSCHelper, public CInterface
if (numUrls == 0)
throw MakeStringException(0, "%s specified no URLs", getWsCallTypeName(wscType));

if (!persistentHandler)
persistEnabled = false;

if (!proxyAddress.isEmpty())
{
UrlListParser proxyUrlListParser(proxyAddress);
Expand Down Expand Up @@ -1316,6 +1358,8 @@ class CWSCHelper : implements IWSCHelper, public CInterface
}
inline IXmlToRowTransformer * getRowTransformer() { return rowTransformer; }
inline const char * wscCallTypeText() const { return getWsCallTypeName(wscType); }
inline bool usePersistConnections() const { return persistEnabled; }
inline unsigned getPersistMaxRequests() const { return persistMaxRequests; }

protected:
friend class CWSCHelperThread;
Expand Down Expand Up @@ -2411,7 +2455,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (strieq(url.method, "https"))
proto = PersistentProtocol::ProtoTLS;
bool shouldClose = false;
Owned<ISocket> psock = persistentHandler?persistentHandler->getAvailable(&ep, &shouldClose, proto):nullptr;
Owned<ISocket> psock = master->usePersistConnections() ? persistentHandler->getAvailable(&ep, &shouldClose, proto) : nullptr;
if (psock)
{
isReused = true;
Expand Down Expand Up @@ -2521,18 +2565,18 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
processResponse(url, response, meta, contentType);
delete meta;

if (persistentHandler)
if (master->usePersistConnections())
{
if (isReused)
persistentHandler->doneUsing(socket, keepAlive);
persistentHandler->doneUsing(socket, keepAlive, master->getPersistMaxRequests());
else if (keepAlive)
persistentHandler->add(socket, &ep, proto);
}
break;
}
catch (IReceivedRoxieException *e)
{
if (persistentHandler && isReused)
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
// server busy ... Sleep and retry
if (e->errorCode() == 1001)
Expand Down Expand Up @@ -2564,7 +2608,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
}
catch (IException *e)
{
if (persistentHandler && isReused)
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
if (master->timeLimitExceeded)
{
Expand Down Expand Up @@ -2597,15 +2641,15 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
}
catch (std::exception & es)
{
if (persistentHandler && isReused)
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
if(dynamic_cast<std::bad_alloc *>(&es))
throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
throw MakeStringException(-1, "std::exception: standard library exception (%s) in CWSCAsyncFor processQuery",es.what());
}
catch (...)
{
if (persistentHandler && isReused)
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
throw MakeStringException(-1, "Unknown exception in processQuery");
}
Expand Down
4 changes: 2 additions & 2 deletions testing/regress/ecl/common/SoapTextTest.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ EXPORT SoapTextTest := MODULE

EXPORT doMain(string serviceUrl, string searchWords, unsigned documentLimit) := FUNCTION

soapcallDocumentCount(string searchWord) := SOAPCALL(serviceUrl, 'soaptest_getdocumentcount', countServiceRequest, transform(countServiceRequest, SELF.search := searchWord), countServiceResponse).cnt;
soapcallDocumentCount(string searchWord) := SOAPCALL(serviceUrl, 'soaptest_getdocumentcount', countServiceRequest, transform(countServiceRequest, SELF.search := searchWord), countServiceResponse, PERSIST).cnt;
callDocumentCount(string search) := IF((serviceUrl != ''), soapcallDocumentCount(search), doDocumentCount(search));

soapcallSearchWords(DATASET(wordRec) searchWords) := SOAPCALL(serviceUrl, 'soaptest_getsearchwords', { DATASET(wordRec) search := searchWords }, DATASET(joinServiceResponseRecord));
soapcallSearchWords(DATASET(wordRec) searchWords) := SOAPCALL(serviceUrl, 'soaptest_getsearchwords', { DATASET(wordRec) search := searchWords }, DATASET(joinServiceResponseRecord), PERSIST);
callSearchWords(DATASET(wordRec) searchWords) := IF((serviceUrl != ''), soapcallSearchWords(searchWords), doSearchWords(searchWords));

splitWords := Std.Str.SplitWords(searchWords, ',', false);
Expand Down

0 comments on commit 28c753e

Please sign in to comment.