Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.4.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	helm/hpcc/templates/dafilesrv.yaml
#	helm/hpcc/templates/dali.yaml
#	helm/hpcc/templates/dfuserver.yaml
#	helm/hpcc/templates/eclagent.yaml
#	helm/hpcc/templates/eclccserver.yaml
#	helm/hpcc/templates/eclscheduler.yaml
#	helm/hpcc/templates/esp.yaml
#	helm/hpcc/templates/localroxie.yaml
#	helm/hpcc/templates/roxie.yaml
#	helm/hpcc/templates/sasha.yaml
#	helm/hpcc/templates/thor.yaml
#	version.cmake
  • Loading branch information
jakesmith committed Jan 11, 2024
2 parents 278b37c + 870c0f7 commit 335169b
Show file tree
Hide file tree
Showing 71 changed files with 565 additions and 729 deletions.
21 changes: 11 additions & 10 deletions common/thorhelper/persistent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
{
}

virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override
virtual void add(ISocket* sock, SocketEndpoint* ep, PersistentProtocol proto) override
{
if (!sock || !sock->isValid())
return;
Expand All @@ -223,7 +223,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
}
}
m_selectHandler->add(sock, SELECTMODE_READ, this);
Owned<CPersistentInfo> info = new CPersistentInfo(false, usTick()/1000, 0, ep, proto, sock);
Owned<CPersistentInfo> info = new CPersistentInfo(false, msTick(), 0, ep, proto, sock);
m_infomap.setValue(sock, info.getLink());
m_availkeeper.add(info);
}
Expand All @@ -249,7 +249,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
m_selectHandler->remove(sock);
}

virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne, unsigned overrideMaxRequests) override
{
PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Done using socket %d, keep=%s", sock->OShandle(), boolToStr(keep));
CriticalBlock block(m_critsect);
Expand All @@ -260,13 +260,14 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
if (info)
{
info->useCount += usesOverOne;
bool reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount;
unsigned requestLimit = overrideMaxRequests ? overrideMaxRequests : m_maxReqs;
bool reachedQuota = requestLimit > 0 && requestLimit <= info->useCount;
if(!sock->isValid())
keep = false;
if (keep && !reachedQuota)
{
info->inUse = false;
info->timeUsed = usTick()/1000;
info->timeUsed = msTick();
m_selectHandler->add(sock, SELECTMODE_READ, this);
m_availkeeper.add(info);
}
Expand All @@ -289,7 +290,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
{
Linked<ISocket> sock = info->sock;
info->inUse = true;
info->timeUsed = usTick()/1000;
info->timeUsed = msTick();
info->useCount++;
if (pShouldClose != nullptr)
*pShouldClose = m_maxReqs > 0 && m_maxReqs <= info->useCount;
Expand Down Expand Up @@ -353,7 +354,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
{
m_availkeeper.remove(info);
info->inUse = true;
info->timeUsed = usTick()/1000;
info->timeUsed = msTick();
info->useCount++;
reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount;
}
Expand Down Expand Up @@ -391,7 +392,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
m_waitsem.wait(1000);
if (m_stop)
break;
unsigned now = usTick()/1000;
unsigned now = msTick();
CriticalBlock block(m_critsect);
std::vector<ISocket*> socks1;
std::vector<ISocket*> socks2;
Expand All @@ -400,9 +401,9 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
CPersistentInfo* info = si.getValue();
if (!info)
continue;
if(m_maxIdleTime > 0 && !info->inUse && info->timeUsed + m_maxIdleTime*1000 < now)
if(m_maxIdleTime > 0 && !info->inUse && now - info->timeUsed >= m_maxIdleTime*1000)
socks1.push_back(*(ISocket**)(si.getKey()));
if(info->inUse && info->timeUsed + MAX_INFLIGHT_TIME*1000 < now)
if(info->inUse && now - info->timeUsed >= MAX_INFLIGHT_TIME*1000)
socks2.push_back(*(ISocket**)(si.getKey()));
}
for (auto& s:socks1)
Expand Down
2 changes: 1 addition & 1 deletion common/thorhelper/persistent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface IPersistentHandler : implements IInterface
// Remove a socket from the pool
virtual void remove(ISocket* sock) = 0;
// Put a socket back to the pool for further reuse, or remove its record from the pool when "keep" is false
virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0) = 0;
virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0, unsigned overrideMaxRequests = 0) = 0;
// Get one available socket from the pool
virtual ISocket* getAvailable(SocketEndpoint* ep = nullptr, bool* pShouldClose = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) = 0;
virtual void stop(bool wait) = 0;
Expand Down
71 changes: 59 additions & 12 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ class BlackLister : public CInterface, implements IThreadFactory
}
} *blacklist;

static bool defaultUsePersistConnections = false;
static IPersistentHandler* persistentHandler = nullptr;
static CriticalSection globalFeatureCrit;
static std::atomic<bool> globalFeaturesInitDone{false};
Expand All @@ -568,18 +569,25 @@ void initGlobalFeatures()
CriticalBlock block(globalFeatureCrit);
if (!globalFeaturesInitDone)
{
int maxPersistentRequests = 0;
int maxPersistentRequests = 100;
defaultUsePersistConnections = false;
if (!isContainerized())
{
defaultUsePersistConnections = queryEnvironmentConf().getPropBool("useHttpCallPersistentRequests", defaultUsePersistConnections);
maxPersistentRequests = queryEnvironmentConf().getPropInt("maxHttpCallPersistentRequests", maxPersistentRequests); //global (backward compatible)
}

Owned<IPropertyTree> conf = getComponentConfig();
defaultUsePersistConnections = conf->getPropBool("@useHttpCallPersistentRequests", defaultUsePersistConnections);
maxPersistentRequests = conf->getPropInt("@maxHttpCallPersistentRequests", maxPersistentRequests); //component config wins
mapUrlsToSecrets = conf->getPropBool("@mapHttpCallUrlsToSecrets", false);
warnIfUrlNotMappedToSecret = conf->getPropBool("@warnIfUrlNotMappedToSecret", mapUrlsToSecrets);
requireUrlsMappedToSecrets = conf->getPropBool("@requireUrlsMappedToSecrets", false);

if (maxPersistentRequests != 0)
persistentHandler = createPersistentHandler(nullptr, DEFAULT_MAX_PERSISTENT_IDLE_TIME, maxPersistentRequests, PersistentLogLevel::PLogMin, true);
else
defaultUsePersistConnections = false;

globalFeaturesInitDone = true;
}
Expand All @@ -601,6 +609,7 @@ MODULE_EXIT()
{
persistentHandler->stop(true);
::Release(persistentHandler);
persistentHandler = nullptr;
}
}

Expand Down Expand Up @@ -876,7 +885,7 @@ class CWSCHelperThread : public Thread
}
};

bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &urlArray, StringBuffer &issuer, StringBuffer &proxyAddress, bool required, WSCType wscType)
bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &urlArray, StringBuffer &issuer, StringBuffer &proxyAddress, bool & persistEnabled, unsigned & persistMaxRequests, bool required, WSCType wscType)
{
Owned<const IPropertyTree> secret;
if (!isEmptyString(secretName))
Expand Down Expand Up @@ -907,6 +916,15 @@ bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &ur
urlListParser.getUrls(urlArray, usernamePasswordPair);
getSecretKeyValue(proxyAddress.clear(), secret, "proxy");
getSecretKeyValue(issuer, secret, "issuer");

//Options defined in the secret override the defaults and the values specified in ECL
StringBuffer persist;
if (getSecretKeyValue(persist, secret, "persist"))
persistEnabled = strToBool(persist.str());

if (getSecretKeyValue(persist.clear(), secret, "persistMaxRequests"))
persistMaxRequests = atoi(persist);

return true;
}

Expand Down Expand Up @@ -934,6 +952,8 @@ class CWSCHelper : implements IWSCHelper, public CInterface
bool complete;
std::atomic_bool timeLimitExceeded{false};
bool customClientCert = false;
bool persistEnabled = false;
unsigned persistMaxRequests = 0;
StringAttr clientCertIssuer;
IRoxieAbortMonitor * roxieAbortMonitor;
StringBuffer issuer; //TBD sync up with other PR, it will benefit from this being able to come from the secret
Expand Down Expand Up @@ -988,6 +1008,25 @@ class CWSCHelper : implements IWSCHelper, public CInterface
else
timeLimitMS = (unsigned)(dval * 1000);

persistEnabled = defaultUsePersistConnections;
persistMaxRequests = 0; // 0 implies do not override the default pool size
if (flags & SOAPFpersist)
{
if (flags & SOAPFpersistMax)
{
unsigned maxRequests = helper->getPersistMaxRequests();
if (maxRequests != 0)
{
persistEnabled = true;
persistMaxRequests = maxRequests;
}
else
persistEnabled = false;
}
else
persistEnabled = true;
}

if (flags & SOAPFhttpheaders)
httpHeaders.set(s.setown(helper->getHttpHeaders()));
if (flags & SOAPFxpathhints)
Expand Down Expand Up @@ -1097,7 +1136,7 @@ class CWSCHelper : implements IWSCHelper, public CInterface
}
StringBuffer secretName("http-connect-");
secretName.append(finger);
loadConnectSecret(vaultId, secretName, urlArray, issuer, proxyAddress, true, wscType);
loadConnectSecret(vaultId, secretName, urlArray, issuer, proxyAddress, persistEnabled, persistMaxRequests, true, wscType);
}
else
{
Expand All @@ -1108,11 +1147,11 @@ class CWSCHelper : implements IWSCHelper, public CInterface
StringBuffer secretName;
UrlArray tempArray;
//TBD: If this is a list of URLs do we A. not check for a mapped secret, B. check the first one, C. Use long secret name including entire list
Url &url = urlArray.tos();
Url &url = urlArray.item(0);
url.getDynamicUrlSecretName(secretName);
if (secretName.length())
{
if (loadConnectSecret(nullptr, secretName, tempArray, issuer, proxyAddress, requireUrlsMappedToSecrets, wscType))
if (loadConnectSecret(nullptr, secretName, tempArray, issuer, proxyAddress, persistEnabled, persistMaxRequests, requireUrlsMappedToSecrets, wscType))
{
logctx.CTXLOG("Mapped %s URL!", wscCallTypeText());
if (tempArray.length())
Expand All @@ -1131,6 +1170,9 @@ class CWSCHelper : implements IWSCHelper, public CInterface
if (numUrls == 0)
throw MakeStringException(0, "%s specified no URLs", getWsCallTypeName(wscType));

if (!persistentHandler)
persistEnabled = false;

if (!proxyAddress.isEmpty())
{
UrlListParser proxyUrlListParser(proxyAddress);
Expand Down Expand Up @@ -1316,6 +1358,8 @@ class CWSCHelper : implements IWSCHelper, public CInterface
}
inline IXmlToRowTransformer * getRowTransformer() { return rowTransformer; }
inline const char * wscCallTypeText() const { return getWsCallTypeName(wscType); }
inline bool usePersistConnections() const { return persistEnabled; }
inline unsigned getPersistMaxRequests() const { return persistMaxRequests; }

protected:
friend class CWSCHelperThread;
Expand Down Expand Up @@ -2407,11 +2451,14 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
checkTimeLimitExceeded(&remainingMS);
Url &connUrl = master->proxyUrlArray.empty() ? url : master->proxyUrlArray.item(0);
ep.set(connUrl.host.get(), connUrl.port);
if (ep.isNull())
throw MakeStringException(-1, "Failed to resolve host '%s'", nullText(connUrl.host.get()));

checkTimeLimitExceeded(&remainingMS); // after ep.set which might make a potentially long getaddrinfo lookup ...
if (strieq(url.method, "https"))
proto = PersistentProtocol::ProtoTLS;
bool shouldClose = false;
Owned<ISocket> psock = persistentHandler?persistentHandler->getAvailable(&ep, &shouldClose, proto):nullptr;
Owned<ISocket> psock = master->usePersistConnections() ? persistentHandler->getAvailable(&ep, &shouldClose, proto) : nullptr;
if (psock)
{
isReused = true;
Expand Down Expand Up @@ -2521,18 +2568,18 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
processResponse(url, response, meta, contentType);
delete meta;

if (persistentHandler)
if (master->usePersistConnections())
{
if (isReused)
persistentHandler->doneUsing(socket, keepAlive);
persistentHandler->doneUsing(socket, keepAlive, master->getPersistMaxRequests());
else if (keepAlive)
persistentHandler->add(socket, &ep, proto);
}
break;
}
catch (IReceivedRoxieException *e)
{
if (persistentHandler && isReused)
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
// server busy ... Sleep and retry
if (e->errorCode() == 1001)
Expand Down Expand Up @@ -2564,7 +2611,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
}
catch (IException *e)
{
if (persistentHandler && isReused)
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
if (master->timeLimitExceeded)
{
Expand Down Expand Up @@ -2597,15 +2644,15 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
}
catch (std::exception & es)
{
if (persistentHandler && isReused)
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
if(dynamic_cast<std::bad_alloc *>(&es))
throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
throw MakeStringException(-1, "std::exception: standard library exception (%s) in CWSCAsyncFor processQuery",es.what());
}
catch (...)
{
if (persistentHandler && isReused)
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
throw MakeStringException(-1, "Unknown exception in processQuery");
}
Expand Down
24 changes: 12 additions & 12 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,30 +191,30 @@ static IPropertyTree *getCostPropTree(const char *cluster)
}
}

extern da_decl double calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays)
extern da_decl cost_type calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays)
{
Owned<const IPropertyTree> costPT = getCostPropTree(cluster);

if (costPT==nullptr)
return 0.0;
return 0;
double atRestPrice = costPT->getPropReal("@storageAtRest", 0.0);
double storageCostDaily = atRestPrice * 12 / 365;
return storageCostDaily * sizeGB * fileAgeDays;
return money2cost_type(storageCostDaily * sizeGB * fileAgeDays);
}

extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads)
extern da_decl cost_type calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads)
{
Owned<const IPropertyTree> costPT = getCostPropTree(cluster);

if (costPT==nullptr)
return 0.0;
return 0;
constexpr int accessPriceScalingFactor = 10000; // read/write pricing based on 10,000 operations
double readPrice = costPT->getPropReal("@storageReads", 0.0);
double writePrice = costPT->getPropReal("@storageWrites", 0.0);
return (readPrice * numDiskReads / accessPriceScalingFactor) + (writePrice * numDiskWrites / accessPriceScalingFactor);
return money2cost_type((readPrice * numDiskReads / accessPriceScalingFactor) + (writePrice * numDiskWrites / accessPriceScalingFactor));
}

extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads)
extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads)
{
StringBuffer clusterName;
// Should really specify the cluster number too, but this is the best we can do for now
Expand Down Expand Up @@ -4942,7 +4942,7 @@ protected: friend class CDistributedFilePart;
else
return false;
}
virtual void getCost(const char * cluster, double & atRestCost, double & accessCost) override
virtual void getCost(const char * cluster, cost_type & atRestCost, cost_type & accessCost) override
{
CDateTime dt;
getModificationTime(dt);
Expand All @@ -4956,7 +4956,7 @@ protected: friend class CDistributedFilePart;
if (hasReadWriteCostFields(*attrs))
{
// Newer files have readCost and writeCost attributes
accessCost = cost_type2money(attrs->getPropInt64(getDFUQResultFieldName(DFUQRFreadCost)) + attrs->getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost)));
accessCost = attrs->getPropInt64(getDFUQResultFieldName(DFUQRFreadCost)) + attrs->getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost));
}
else
{
Expand Down Expand Up @@ -7041,12 +7041,12 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
return false;
}

virtual void getCost(const char * cluster, double & atRestCost, double & accessCost) override
virtual void getCost(const char * cluster, cost_type & atRestCost, cost_type & accessCost) override
{
CriticalBlock block (sect);
ForEachItemIn(i,subfiles)
{
double tmpAtRestcost, tmpAccessCost;
cost_type tmpAtRestcost, tmpAccessCost;
IDistributedFile &f = subfiles.item(i);
f.getCost(cluster, tmpAtRestcost, tmpAccessCost);
atRestCost += tmpAtRestcost;
Expand Down Expand Up @@ -13454,7 +13454,7 @@ IPropertyTreeIterator *deserializeFileAttrIterator(MemoryBuffer& mb, unsigned nu
else
sizeDiskSize = file->getPropInt64(getDFUQResultFieldName(DFUQRForigsize), 0);
double sizeGB = sizeDiskSize / ((double)1024 * 1024 * 1024);
cost_type atRestCost = money2cost_type(calcFileAtRestCost(nodeGroup, sizeGB, fileAgeDays));
cost_type atRestCost = calcFileAtRestCost(nodeGroup, sizeGB, fileAgeDays);
file->setPropInt64(getDFUQResultFieldName(DFUQRFatRestCost), atRestCost);

// Dyamically calc and set the access cost field and for legacy files set read/write cost fields
Expand Down
Loading

0 comments on commit 335169b

Please sign in to comment.