diff --git a/common/thorhelper/persistent.cpp b/common/thorhelper/persistent.cpp index 52133b0d2d0..9a3889e6195 100644 --- a/common/thorhelper/persistent.cpp +++ b/common/thorhelper/persistent.cpp @@ -205,7 +205,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele { } - virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override + virtual void add(ISocket* sock, SocketEndpoint* ep, PersistentProtocol proto) override { if (!sock || !sock->isValid()) return; @@ -223,7 +223,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele } } m_selectHandler->add(sock, SELECTMODE_READ, this); - Owned info = new CPersistentInfo(false, usTick()/1000, 0, ep, proto, sock); + Owned info = new CPersistentInfo(false, msTick(), 0, ep, proto, sock); m_infomap.setValue(sock, info.getLink()); m_availkeeper.add(info); } @@ -249,7 +249,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele m_selectHandler->remove(sock); } - virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override + virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne, unsigned overrideMaxRequests) override { PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Done using socket %d, keep=%s", sock->OShandle(), boolToStr(keep)); CriticalBlock block(m_critsect); @@ -260,13 +260,14 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele if (info) { info->useCount += usesOverOne; - bool reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount; + unsigned requestLimit = overrideMaxRequests ? overrideMaxRequests : m_maxReqs; + bool reachedQuota = requestLimit > 0 && requestLimit <= info->useCount; if(!sock->isValid()) keep = false; if (keep && !reachedQuota) { info->inUse = false; - info->timeUsed = usTick()/1000; + info->timeUsed = msTick(); m_selectHandler->add(sock, SELECTMODE_READ, this); m_availkeeper.add(info); } @@ -289,7 +290,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele { Linked sock = info->sock; info->inUse = true; - info->timeUsed = usTick()/1000; + info->timeUsed = msTick(); info->useCount++; if (pShouldClose != nullptr) *pShouldClose = m_maxReqs > 0 && m_maxReqs <= info->useCount; @@ -353,7 +354,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele { m_availkeeper.remove(info); info->inUse = true; - info->timeUsed = usTick()/1000; + info->timeUsed = msTick(); info->useCount++; reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount; } @@ -391,7 +392,7 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele m_waitsem.wait(1000); if (m_stop) break; - unsigned now = usTick()/1000; + unsigned now = msTick(); CriticalBlock block(m_critsect); std::vector socks1; std::vector socks2; @@ -400,9 +401,9 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele CPersistentInfo* info = si.getValue(); if (!info) continue; - if(m_maxIdleTime > 0 && !info->inUse && info->timeUsed + m_maxIdleTime*1000 < now) + if(m_maxIdleTime > 0 && !info->inUse && now - info->timeUsed >= m_maxIdleTime*1000) socks1.push_back(*(ISocket**)(si.getKey())); - if(info->inUse && info->timeUsed + MAX_INFLIGHT_TIME*1000 < now) + if(info->inUse && now - info->timeUsed >= MAX_INFLIGHT_TIME*1000) socks2.push_back(*(ISocket**)(si.getKey())); } for (auto& s:socks1) diff --git a/common/thorhelper/persistent.hpp b/common/thorhelper/persistent.hpp index 3276ac69b8b..7c914135b5a 100644 --- a/common/thorhelper/persistent.hpp +++ b/common/thorhelper/persistent.hpp @@ -39,7 +39,7 @@ interface IPersistentHandler : implements IInterface // Remove a socket from the pool virtual void remove(ISocket* sock) = 0; // Put a socket back to the pool for further reuse, or remove its record from the pool when "keep" is false - virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0) = 0; + virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0, unsigned overrideMaxRequests = 0) = 0; // Get one available socket from the pool virtual ISocket* getAvailable(SocketEndpoint* ep = nullptr, bool* pShouldClose = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) = 0; virtual void stop(bool wait) = 0; diff --git a/common/thorhelper/thorsoapcall.cpp b/common/thorhelper/thorsoapcall.cpp index 814ba8b26a8..903bbb52909 100644 --- a/common/thorhelper/thorsoapcall.cpp +++ b/common/thorhelper/thorsoapcall.cpp @@ -556,6 +556,7 @@ class BlackLister : public CInterface, implements IThreadFactory } } *blacklist; +static bool defaultUsePersistConnections = false; static IPersistentHandler* persistentHandler = nullptr; static CriticalSection globalFeatureCrit; static std::atomic globalFeaturesInitDone{false}; @@ -568,11 +569,16 @@ void initGlobalFeatures() CriticalBlock block(globalFeatureCrit); if (!globalFeaturesInitDone) { - int maxPersistentRequests = 0; + int maxPersistentRequests = 100; + defaultUsePersistConnections = false; if (!isContainerized()) + { + defaultUsePersistConnections = queryEnvironmentConf().getPropBool("useHttpCallPersistentRequests", defaultUsePersistConnections); maxPersistentRequests = queryEnvironmentConf().getPropInt("maxHttpCallPersistentRequests", maxPersistentRequests); //global (backward compatible) + } Owned conf = getComponentConfig(); + defaultUsePersistConnections = conf->getPropBool("@useHttpCallPersistentRequests", defaultUsePersistConnections); maxPersistentRequests = conf->getPropInt("@maxHttpCallPersistentRequests", maxPersistentRequests); //component config wins mapUrlsToSecrets = conf->getPropBool("@mapHttpCallUrlsToSecrets", false); warnIfUrlNotMappedToSecret = conf->getPropBool("@warnIfUrlNotMappedToSecret", mapUrlsToSecrets); @@ -580,6 +586,8 @@ void initGlobalFeatures() if (maxPersistentRequests != 0) persistentHandler = createPersistentHandler(nullptr, DEFAULT_MAX_PERSISTENT_IDLE_TIME, maxPersistentRequests, PersistentLogLevel::PLogMin, true); + else + defaultUsePersistConnections = false; globalFeaturesInitDone = true; } @@ -601,6 +609,7 @@ MODULE_EXIT() { persistentHandler->stop(true); ::Release(persistentHandler); + persistentHandler = nullptr; } } @@ -876,7 +885,7 @@ class CWSCHelperThread : public Thread } }; -bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &urlArray, StringBuffer &issuer, StringBuffer &proxyAddress, bool required, WSCType wscType) +bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &urlArray, StringBuffer &issuer, StringBuffer &proxyAddress, bool & persistEnabled, unsigned & persistMaxRequests, bool required, WSCType wscType) { Owned secret; if (!isEmptyString(secretName)) @@ -907,6 +916,15 @@ bool loadConnectSecret(const char *vaultId, const char *secretName, UrlArray &ur urlListParser.getUrls(urlArray, usernamePasswordPair); getSecretKeyValue(proxyAddress.clear(), secret, "proxy"); getSecretKeyValue(issuer, secret, "issuer"); + + //Options defined in the secret override the defaults and the values specified in ECL + StringBuffer persist; + if (getSecretKeyValue(persist, secret, "persist")) + persistEnabled = strToBool(persist.str()); + + if (getSecretKeyValue(persist.clear(), secret, "persistMaxRequests")) + persistMaxRequests = atoi(persist); + return true; } @@ -934,6 +952,8 @@ class CWSCHelper : implements IWSCHelper, public CInterface bool complete; std::atomic_bool timeLimitExceeded{false}; bool customClientCert = false; + bool persistEnabled = false; + unsigned persistMaxRequests = 0; StringAttr clientCertIssuer; IRoxieAbortMonitor * roxieAbortMonitor; StringBuffer issuer; //TBD sync up with other PR, it will benefit from this being able to come from the secret @@ -988,6 +1008,25 @@ class CWSCHelper : implements IWSCHelper, public CInterface else timeLimitMS = (unsigned)(dval * 1000); + persistEnabled = defaultUsePersistConnections; + persistMaxRequests = 0; // 0 implies do not override the default pool size + if (flags & SOAPFpersist) + { + if (flags & SOAPFpersistMax) + { + unsigned maxRequests = helper->getPersistMaxRequests(); + if (maxRequests != 0) + { + persistEnabled = true; + persistMaxRequests = maxRequests; + } + else + persistEnabled = false; + } + else + persistEnabled = true; + } + if (flags & SOAPFhttpheaders) httpHeaders.set(s.setown(helper->getHttpHeaders())); if (flags & SOAPFxpathhints) @@ -1097,7 +1136,7 @@ class CWSCHelper : implements IWSCHelper, public CInterface } StringBuffer secretName("http-connect-"); secretName.append(finger); - loadConnectSecret(vaultId, secretName, urlArray, issuer, proxyAddress, true, wscType); + loadConnectSecret(vaultId, secretName, urlArray, issuer, proxyAddress, persistEnabled, persistMaxRequests, true, wscType); } else { @@ -1108,11 +1147,11 @@ class CWSCHelper : implements IWSCHelper, public CInterface StringBuffer secretName; UrlArray tempArray; //TBD: If this is a list of URLs do we A. not check for a mapped secret, B. check the first one, C. Use long secret name including entire list - Url &url = urlArray.tos(); + Url &url = urlArray.item(0); url.getDynamicUrlSecretName(secretName); if (secretName.length()) { - if (loadConnectSecret(nullptr, secretName, tempArray, issuer, proxyAddress, requireUrlsMappedToSecrets, wscType)) + if (loadConnectSecret(nullptr, secretName, tempArray, issuer, proxyAddress, persistEnabled, persistMaxRequests, requireUrlsMappedToSecrets, wscType)) { logctx.CTXLOG("Mapped %s URL!", wscCallTypeText()); if (tempArray.length()) @@ -1131,6 +1170,9 @@ class CWSCHelper : implements IWSCHelper, public CInterface if (numUrls == 0) throw MakeStringException(0, "%s specified no URLs", getWsCallTypeName(wscType)); + if (!persistentHandler) + persistEnabled = false; + if (!proxyAddress.isEmpty()) { UrlListParser proxyUrlListParser(proxyAddress); @@ -1316,6 +1358,8 @@ class CWSCHelper : implements IWSCHelper, public CInterface } inline IXmlToRowTransformer * getRowTransformer() { return rowTransformer; } inline const char * wscCallTypeText() const { return getWsCallTypeName(wscType); } + inline bool usePersistConnections() const { return persistEnabled; } + inline unsigned getPersistMaxRequests() const { return persistMaxRequests; } protected: friend class CWSCHelperThread; @@ -2407,11 +2451,14 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo checkTimeLimitExceeded(&remainingMS); Url &connUrl = master->proxyUrlArray.empty() ? url : master->proxyUrlArray.item(0); ep.set(connUrl.host.get(), connUrl.port); + if (ep.isNull()) + throw MakeStringException(-1, "Failed to resolve host '%s'", nullText(connUrl.host.get())); + checkTimeLimitExceeded(&remainingMS); // after ep.set which might make a potentially long getaddrinfo lookup ... if (strieq(url.method, "https")) proto = PersistentProtocol::ProtoTLS; bool shouldClose = false; - Owned psock = persistentHandler?persistentHandler->getAvailable(&ep, &shouldClose, proto):nullptr; + Owned psock = master->usePersistConnections() ? persistentHandler->getAvailable(&ep, &shouldClose, proto) : nullptr; if (psock) { isReused = true; @@ -2521,10 +2568,10 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo processResponse(url, response, meta, contentType); delete meta; - if (persistentHandler) + if (master->usePersistConnections()) { if (isReused) - persistentHandler->doneUsing(socket, keepAlive); + persistentHandler->doneUsing(socket, keepAlive, master->getPersistMaxRequests()); else if (keepAlive) persistentHandler->add(socket, &ep, proto); } @@ -2532,7 +2579,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo } catch (IReceivedRoxieException *e) { - if (persistentHandler && isReused) + if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); // server busy ... Sleep and retry if (e->errorCode() == 1001) @@ -2564,7 +2611,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo } catch (IException *e) { - if (persistentHandler && isReused) + if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); if (master->timeLimitExceeded) { @@ -2597,7 +2644,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo } catch (std::exception & es) { - if (persistentHandler && isReused) + if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); if(dynamic_cast(&es)) throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery"); @@ -2605,7 +2652,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo } catch (...) { - if (persistentHandler && isReused) + if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); throw MakeStringException(-1, "Unknown exception in processQuery"); } diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 31c3d422a0e..195428a7e24 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -191,30 +191,30 @@ static IPropertyTree *getCostPropTree(const char *cluster) } } -extern da_decl double calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays) +extern da_decl cost_type calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays) { Owned costPT = getCostPropTree(cluster); if (costPT==nullptr) - return 0.0; + return 0; double atRestPrice = costPT->getPropReal("@storageAtRest", 0.0); double storageCostDaily = atRestPrice * 12 / 365; - return storageCostDaily * sizeGB * fileAgeDays; + return money2cost_type(storageCostDaily * sizeGB * fileAgeDays); } -extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads) +extern da_decl cost_type calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads) { Owned costPT = getCostPropTree(cluster); if (costPT==nullptr) - return 0.0; + return 0; constexpr int accessPriceScalingFactor = 10000; // read/write pricing based on 10,000 operations double readPrice = costPT->getPropReal("@storageReads", 0.0); double writePrice = costPT->getPropReal("@storageWrites", 0.0); - return (readPrice * numDiskReads / accessPriceScalingFactor) + (writePrice * numDiskWrites / accessPriceScalingFactor); + return money2cost_type((readPrice * numDiskReads / accessPriceScalingFactor) + (writePrice * numDiskWrites / accessPriceScalingFactor)); } -extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads) +extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads) { StringBuffer clusterName; // Should really specify the cluster number too, but this is the best we can do for now @@ -4942,7 +4942,7 @@ protected: friend class CDistributedFilePart; else return false; } - virtual void getCost(const char * cluster, double & atRestCost, double & accessCost) override + virtual void getCost(const char * cluster, cost_type & atRestCost, cost_type & accessCost) override { CDateTime dt; getModificationTime(dt); @@ -4956,7 +4956,7 @@ protected: friend class CDistributedFilePart; if (hasReadWriteCostFields(*attrs)) { // Newer files have readCost and writeCost attributes - accessCost = cost_type2money(attrs->getPropInt64(getDFUQResultFieldName(DFUQRFreadCost)) + attrs->getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost))); + accessCost = attrs->getPropInt64(getDFUQResultFieldName(DFUQRFreadCost)) + attrs->getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost)); } else { @@ -7041,12 +7041,12 @@ class CDistributedSuperFile: public CDistributedFileBase return false; } - virtual void getCost(const char * cluster, double & atRestCost, double & accessCost) override + virtual void getCost(const char * cluster, cost_type & atRestCost, cost_type & accessCost) override { CriticalBlock block (sect); ForEachItemIn(i,subfiles) { - double tmpAtRestcost, tmpAccessCost; + cost_type tmpAtRestcost, tmpAccessCost; IDistributedFile &f = subfiles.item(i); f.getCost(cluster, tmpAtRestcost, tmpAccessCost); atRestCost += tmpAtRestcost; @@ -13454,7 +13454,7 @@ IPropertyTreeIterator *deserializeFileAttrIterator(MemoryBuffer& mb, unsigned nu else sizeDiskSize = file->getPropInt64(getDFUQResultFieldName(DFUQRForigsize), 0); double sizeGB = sizeDiskSize / ((double)1024 * 1024 * 1024); - cost_type atRestCost = money2cost_type(calcFileAtRestCost(nodeGroup, sizeGB, fileAgeDays)); + cost_type atRestCost = calcFileAtRestCost(nodeGroup, sizeGB, fileAgeDays); file->setPropInt64(getDFUQResultFieldName(DFUQRFatRestCost), atRestCost); // Dyamically calc and set the access cost field and for legacy files set read/write cost fields diff --git a/dali/base/dadfs.hpp b/dali/base/dadfs.hpp index 6e5eff9e37e..c475f37a8ae 100644 --- a/dali/base/dadfs.hpp +++ b/dali/base/dadfs.hpp @@ -433,7 +433,7 @@ interface IDistributedFile: extends IInterface virtual bool getSkewInfo(unsigned &maxSkew, unsigned &minSkew, unsigned &maxSkewPart, unsigned &minSkewPart, bool calculateIfMissing) = 0; virtual int getExpire(StringBuffer *expirationDate) = 0; virtual void setExpire(int expireDays) = 0; - virtual void getCost(const char * cluster, double & atRestCost, double & accessCost) = 0; + virtual void getCost(const char * cluster, cost_type & atRestCost, cost_type & accessCost) = 0; }; @@ -889,9 +889,9 @@ extern da_decl void ensureFileScope(const CDfsLogicalFileName &dlfn, unsigned ti extern da_decl bool checkLogicalName(const char *lfn,IUserDescriptor *user,bool readreq,bool createreq,bool allowquery,const char *specialnotallowedmsg); -extern da_decl double calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays); -extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads); -extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads); +extern da_decl cost_type calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays); +extern da_decl cost_type calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads); +extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads); constexpr bool defaultPrivilegedUser = true; constexpr bool defaultNonPrivilegedUser = false; @@ -910,7 +910,7 @@ inline cost_type getLegacyReadCost(const IPropertyTree & fileAttr, Source source && !isFileKey(fileAttr)) { stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0); - return money2cost_type(calcFileAccessCost(source, 0, prevDiskReads)); + return calcFileAccessCost(source, 0, prevDiskReads); } else return 0; @@ -922,7 +922,7 @@ inline cost_type getLegacyWriteCost(const IPropertyTree & fileAttr, Source sourc if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskWrites))) { stat_type prevDiskWrites = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), 0); - return money2cost_type(calcFileAccessCost(source, prevDiskWrites, 0)); + return calcFileAccessCost(source, prevDiskWrites, 0); } else return 0; diff --git a/dali/base/dasds.cpp b/dali/base/dasds.cpp index f2221cbf7e5..ce63df31a7a 100644 --- a/dali/base/dasds.cpp +++ b/dali/base/dasds.cpp @@ -228,7 +228,7 @@ class CLCLockBlock : public CInterface if (lock.lockWrite(timeout)) break; } - PROGLOG("CLCLockBlock(write=%d) timeout %s(%d), took %d ms",!readLock,fname,lnum,got-msTick()); + PROGLOG("CLCLockBlock(write=%d) timeout %s(%d), took %d ms",!readLock,fname,lnum,msTick()-got); if (readWriteStackTracing) PrintStackReport(); } diff --git a/dali/daliadmin/daliadmin.cpp b/dali/daliadmin/daliadmin.cpp index 474580c01d4..26c002fbb0f 100644 --- a/dali/daliadmin/daliadmin.cpp +++ b/dali/daliadmin/daliadmin.cpp @@ -726,10 +726,10 @@ static void testDFSFile(IDistributedFile *legacyDfsFile, const char *logicalName PROGLOG("expire: %d", expire); try { - double atRestCost, accessCost; + cost_type atRestCost, accessCost; legacyDfsFile->getCost(clusterName.str(), atRestCost, accessCost); - PROGLOG("accessCost: %f", accessCost); - PROGLOG("atRestCost: %f", atRestCost); + PROGLOG("accessCost: %f", cost_type2money(accessCost)); + PROGLOG("atRestCost: %f", cost_type2money(atRestCost)); } catch(IException *e) { diff --git a/dali/dfu/dfurun.cpp b/dali/dfu/dfurun.cpp index 36de1b7a9eb..add2dfe81ea 100644 --- a/dali/dfu/dfurun.cpp +++ b/dali/dfu/dfurun.cpp @@ -143,7 +143,7 @@ class CDFUengine: public CInterface, implements IDFUengine DaftProgress::setRange(sizeReadBefore,totalSize,_totalNodes); progress->setTotalNodes(_totalNodes); } - void setFileAccessCost(double fileAccessCost) + void setFileAccessCost(cost_type fileAccessCost) { progress->setFileAccessCost(fileAccessCost); } diff --git a/dali/dfu/dfuwu.cpp b/dali/dfu/dfuwu.cpp index 26306e8db90..0da31062b43 100644 --- a/dali/dfu/dfuwu.cpp +++ b/dali/dfu/dfuwu.cpp @@ -779,7 +779,7 @@ class CDFUprogress: public CLinkedDFUWUchild, implements IDFUprogress queryRoot()->getProp("@subdone",str); return str; } - double getFileAccessCost() const + cost_type getFileAccessCost() const { CriticalBlock block(parent->crit); return queryRoot()->getPropInt64("@fileAccessCost"); @@ -795,10 +795,10 @@ class CDFUprogress: public CLinkedDFUWUchild, implements IDFUprogress CriticalBlock block(parent->crit); queryRoot()->setProp("@subdone",str); } - void setFileAccessCost(double fileAccessCost) + void setFileAccessCost(cost_type fileAccessCost) { CriticalBlock block(parent->crit); - queryRoot()->setPropReal("@fileAccessCost", fileAccessCost); + queryRoot()->setPropInt64("@fileAccessCost", fileAccessCost); } unsigned incPublisherTaskCount() { diff --git a/dali/dfu/dfuwu.hpp b/dali/dfu/dfuwu.hpp index 923630a719c..770af07c85a 100644 --- a/dali/dfu/dfuwu.hpp +++ b/dali/dfu/dfuwu.hpp @@ -316,7 +316,7 @@ interface IConstDFUprogress: extends IInterface virtual unsigned getTotalNodes() const = 0; virtual StringBuffer &getSubInProgress(StringBuffer &str) const = 0; // sub-DFUWUs in progress virtual StringBuffer &getSubDone(StringBuffer &str) const = 0; // sub-DFUWUs done (list) - virtual double getFileAccessCost() const = 0; + virtual cost_type getFileAccessCost() const = 0; virtual unsigned getPublisherTaskCount() const = 0; }; @@ -333,7 +333,7 @@ interface IDFUprogress: extends IConstDFUprogress virtual void setSubInProgress(const char *str) = 0; // set sub-DFUWUs in progress virtual void setSubDone(const char *str) = 0; // set sub-DFUWUs done virtual void clearProgress() = 0; - virtual void setFileAccessCost(double fileAccessCost) = 0; + virtual void setFileAccessCost(cost_type fileAccessCost) = 0; virtual unsigned incPublisherTaskCount() = 0; }; diff --git a/dali/ft/daft.hpp b/dali/ft/daft.hpp index 39147db875e..84bc0d85860 100644 --- a/dali/ft/daft.hpp +++ b/dali/ft/daft.hpp @@ -33,7 +33,7 @@ interface IDaftProgress { virtual void onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize, unsigned numNodes, unsigned __int64 numReads, unsigned __int64 numWrites) = 0; // how much has been done virtual void setRange(unsigned __int64 sizeReadBefore, unsigned __int64 totalSize, unsigned totalNodes) = 0; // how much has been done - virtual void setFileAccessCost(double fileAccessCost) = 0; + virtual void setFileAccessCost(cost_type fileAccessCost) = 0; }; interface IDaftCopyProgress diff --git a/dali/ft/daftprogress.hpp b/dali/ft/daftprogress.hpp index e6b0468002c..d76b1f24210 100644 --- a/dali/ft/daftprogress.hpp +++ b/dali/ft/daftprogress.hpp @@ -33,7 +33,7 @@ class DALIFT_API DaftProgress : public IDaftProgress unsigned numNodes, unsigned __int64 numReads, unsigned __int64 numWrites) = 0; virtual void displaySummary(const char * timeTaken, unsigned kbPerSecond) = 0; virtual void setRange(unsigned __int64 sizeReadBefore, unsigned __int64 totalSize, unsigned _totalNodes); - virtual void setFileAccessCost(double fileAccessCost) = 0; + virtual void setFileAccessCost(cost_type fileAccessCost) = 0; protected: void formatTime(char * buffer, unsigned secs); @@ -59,7 +59,7 @@ class DALIFT_API DemoProgress : public DaftProgress unsigned __int64 scaledDone, unsigned __int64 scaledTotal, const char * scale, unsigned kbPerSecondAve, unsigned kbPerSecondRate, unsigned numNodes); virtual void displaySummary(const char * timeTaken, unsigned kbPerSecond); - virtual void setFileAccessCost(double fileAccessCost) {}; + virtual void setFileAccessCost(cost_type fileAccessCost) {}; }; #endif diff --git a/dali/ft/filecopy.cpp b/dali/ft/filecopy.cpp index 3a08d82d21a..b9feea834ef 100644 --- a/dali/ft/filecopy.cpp +++ b/dali/ft/filecopy.cpp @@ -3387,7 +3387,7 @@ void FileSprayer::spray() updateTargetProperties(); //Calculate and store file access cost - double fileAccessCost = 0.0; + cost_type fileAccessCost = 0; if (distributedTarget) { StringBuffer cluster; @@ -3592,7 +3592,7 @@ void FileSprayer::updateTargetProperties() DistributedFilePropertyLock lock(distributedTarget); IPropertyTree &curProps = lock.queryAttributes(); - cost_type writeCost = money2cost_type(calcFileAccessCost(distributedTarget, totalNumWrites, 0)); + cost_type writeCost = calcFileAccessCost(distributedTarget, totalNumWrites, 0); curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost); curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites); @@ -3778,7 +3778,7 @@ void FileSprayer::updateTargetProperties() { IPropertyTree & fileAttr = distributedSource->queryAttributes(); cost_type legacyReadCost = getLegacyReadCost(fileAttr, distributedSource); - cost_type curReadCost = money2cost_type(calcFileAccessCost(distributedSource, 0, totalNumReads)); + cost_type curReadCost = calcFileAccessCost(distributedSource, 0, totalNumReads); distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost+curReadCost); distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), totalNumReads); } diff --git a/docs/EN_US/ECLLanguageReference/ECLR_mods/BltInFunc-SOAPCALL.xml b/docs/EN_US/ECLLanguageReference/ECLR_mods/BltInFunc-SOAPCALL.xml index 8d3d648cb03..a12b082a444 100644 --- a/docs/EN_US/ECLLanguageReference/ECLR_mods/BltInFunc-SOAPCALL.xml +++ b/docs/EN_US/ECLLanguageReference/ECLR_mods/BltInFunc-SOAPCALL.xml @@ -216,11 +216,11 @@ option - Optional. If omitted, it uses the default number of - connections. If TRUE, it enables persistent connections. If FALSE + Optional. If omitted, it uses the default number of requests per + connection. If TRUE, it enables persistent connections. If FALSE or 0, it disables persistent connections. If set to an integer, it - enables persistent connections and sets the number of active - connections. + enables persistent connections and sets the maximum number of + requests each connection will be used for. diff --git a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/AdjustTimeTZ.xml b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/AdjustTimeTZ.xml index a63ac60c576..a05207d95ba 100644 --- a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/AdjustTimeTZ.xml +++ b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/AdjustTimeTZ.xml @@ -2,8 +2,6 @@ - - AdjustTimeTZ STD.Date.TimeZone.AdjustTimeTZ diff --git a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/FindTZData.xml b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/FindTZData.xml index 29908ddc1a1..8e7c564f67a 100644 --- a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/FindTZData.xml +++ b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/FindTZData.xml @@ -2,8 +2,6 @@ - - FindTZData STD.Date.TimeZone.FindTZData diff --git a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/SecondsBetweenTZ.xml b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/SecondsBetweenTZ.xml index c566b01525c..14cc851c5a8 100644 --- a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/SecondsBetweenTZ.xml +++ b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/SecondsBetweenTZ.xml @@ -2,8 +2,6 @@ - - SecondsBetweenTZ STD.Date.TimeZone.SecondsBetweenTZ diff --git a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/TZDataForLocation.xml b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/TZDataForLocation.xml index 2c46616b08e..44c47afa8ca 100644 --- a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/TZDataForLocation.xml +++ b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/TZDataForLocation.xml @@ -2,8 +2,6 @@ - - TZDataForLocation STD.Date.TimeZone.TZDataForLocation @@ -26,7 +24,7 @@ location REQUIRED. The name of the location to search for; must be a - non-empty uppercase string. + non-empty uppercase string. diff --git a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/ToLocalTime.xml b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/ToLocalTime.xml index 97edf4bc37d..adceb0e549e 100644 --- a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/ToLocalTime.xml +++ b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/ToLocalTime.xml @@ -2,8 +2,6 @@ - - ToLocalTime STD.Date.TimeZone.ToLocalTime diff --git a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/ToUTCTime.xml b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/ToUTCTime.xml index 2fc41acf239..d136bfe087f 100644 --- a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/ToUTCTime.xml +++ b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/ToUTCTime.xml @@ -2,8 +2,6 @@ - - ToUTCTime STD.Date.TimeZone.ToUTCTime diff --git a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/UniqueTZAbbreviations.xml b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/UniqueTZAbbreviations.xml index f8fcaf32571..b655947bad5 100644 --- a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/UniqueTZAbbreviations.xml +++ b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/UniqueTZAbbreviations.xml @@ -2,8 +2,6 @@ - - UniqueTZAbbreviations - - UniqueTZLocations STD.Date.TimeZone.UniqueTZLocations diff --git a/ecl/hqlcpp/hqlhtcpp.cpp b/ecl/hqlcpp/hqlhtcpp.cpp index 09c2813ddfc..f2939a87576 100644 --- a/ecl/hqlcpp/hqlhtcpp.cpp +++ b/ecl/hqlcpp/hqlhtcpp.cpp @@ -18242,7 +18242,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivitySOAP(BuildCtx & ctx, IHqlExpre else if (matchesBoolean(persistArg, true)) persistArg = nullptr; else if (!matchesConstantValue(persistArg, 0)) // Avoid generating 0 since that is the default implementation - doBuildUnsignedFunction(instance->createctx, "getPersistPoolSize", persistArg); + doBuildUnsignedFunction(instance->createctx, "getPersistMaxRequests", persistArg); } //virtual unsigned getFlags() @@ -18298,7 +18298,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivitySOAP(BuildCtx & ctx, IHqlExpre { flags.append("|SOAPFpersist"); if (persistArg) - flags.append("|SOAPFpersistPool"); + flags.append("|SOAPFpersistMax"); } if (flags.length()) doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1); diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index 3951a5b632a..036d6beb263 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -789,7 +789,7 @@ void CHThorDiskWriteActivity::publish() { StringBuffer clusterName; file->getClusterName(0, clusterName); - diskAccessCost = money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, 0)); + diskAccessCost = calcFileAccessCost(clusterName, numDiskWrites, 0); properties.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), diskAccessCost); } file->attach(logicalName.get(), agent.queryCodeContext()->queryUserDescriptor()); @@ -1437,7 +1437,7 @@ void CHThorIndexWriteActivity::execute() StringBuffer clusterName; dfile->getClusterName(0, clusterName); - diskAccessCost = money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, 0)); + diskAccessCost = calcFileAccessCost(clusterName, numDiskWrites, 0); properties.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), diskAccessCost); } else @@ -8543,7 +8543,7 @@ void CHThorDiskReadBaseActivity::closepart() } IPropertyTree & fileAttr = dFile->queryAttributes(); cost_type legacyReadCost = getLegacyReadCost(fileAttr, dFile); - cost_type curReadCost = money2cost_type(calcFileAccessCost(dFile, 0, curDiskReads)); + cost_type curReadCost = calcFileAccessCost(dFile, 0, curDiskReads); dFile->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost); dFile->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads); diff --git a/esp/clients/ws_dfsclient/ws_dfsclient.cpp b/esp/clients/ws_dfsclient/ws_dfsclient.cpp index 8270b912a8b..3aa39d35bbd 100644 --- a/esp/clients/ws_dfsclient/ws_dfsclient.cpp +++ b/esp/clients/ws_dfsclient/ws_dfsclient.cpp @@ -235,7 +235,7 @@ class CServiceDistributedFileBase : public CSimpleInterfaceOf virtual bool isExternal() const override { return false; } virtual bool getSkewInfo(unsigned &maxSkew, unsigned &minSkew, unsigned &maxSkewPart, unsigned &minSkewPart, bool calculateIfMissing) override { return legacyDFSFile->getSkewInfo(maxSkew, minSkew, maxSkewPart, minSkewPart, calculateIfMissing); } virtual int getExpire(StringBuffer *expirationDate) override { return legacyDFSFile->getExpire(expirationDate); } - virtual void getCost(const char * cluster, double & atRestCost, double & accessCost) override { legacyDFSFile->getCost(cluster, atRestCost, accessCost); } + virtual void getCost(const char * cluster, cost_type & atRestCost, cost_type & accessCost) override { legacyDFSFile->getCost(cluster, atRestCost, accessCost); } // setters (change file meta data) virtual void setPreferredClusters(const char *clusters) override { legacyDFSFile->setPreferredClusters(clusters); } diff --git a/esp/platform/espp.cpp b/esp/platform/espp.cpp index 18477717f7d..a9f9db19129 100644 --- a/esp/platform/espp.cpp +++ b/esp/platform/espp.cpp @@ -458,7 +458,7 @@ int init_main(int argc, const char* argv[]) // legacy esp.xml will contain a generated global section if present in the environment. // replace the empty stub created by loadConfiguration with this environment globals section. - Owned global = envpt->queryPropTree("global"); + Owned global = envpt->getPropTree("global"); if (global) { Owned currentConfig = getComponentConfig(); diff --git a/esp/services/ws_dfu/ws_dfuService.cpp b/esp/services/ws_dfu/ws_dfuService.cpp index e9409c5a553..ad3696f03e5 100644 --- a/esp/services/ws_dfu/ws_dfuService.cpp +++ b/esp/services/ws_dfu/ws_dfuService.cpp @@ -2667,15 +2667,15 @@ void CWsDfuEx::doGetFileDetails(IEspContext &context, IUserDescriptor *udesc, co } if (version >= 1.60) { - double atRestCost, accessCost; + cost_type atRestCost, accessCost; df->getCost(cluster, atRestCost, accessCost); if (version <= 1.61) - FileDetails.setCost(atRestCost+accessCost); + FileDetails.setCost(cost_type2money(atRestCost+accessCost)); else { - FileDetails.setAccessCost(accessCost); - FileDetails.setAtRestCost(atRestCost); + FileDetails.setAccessCost(cost_type2money(accessCost)); + FileDetails.setAtRestCost(cost_type2money(atRestCost)); } } if (version >= 1.65) diff --git a/esp/services/ws_fs/ws_fsService.cpp b/esp/services/ws_fs/ws_fsService.cpp index 691bd2db6a1..b23d4347464 100644 --- a/esp/services/ws_fs/ws_fsService.cpp +++ b/esp/services/ws_fs/ws_fsService.cpp @@ -353,7 +353,7 @@ static void DeepAssign(IEspContext &context, IConstDFUWorkUnit *src, IEspDFUWork if(secs > 0) dest.setSecsLeft(secs); dest.setPercentDone(prog->getPercentDone()); - dest.setFileAccessCost(prog->getFileAccessCost()); + dest.setFileAccessCost(cost_type2money(prog->getFileAccessCost())); } IConstDFUoptions *options = src->queryOptions(); diff --git a/esp/src/src-react/components/DFUWorkunitDetails.tsx b/esp/src/src-react/components/DFUWorkunitDetails.tsx index e959727a081..1a18a580c86 100644 --- a/esp/src/src-react/components/DFUWorkunitDetails.tsx +++ b/esp/src/src-react/components/DFUWorkunitDetails.tsx @@ -4,6 +4,7 @@ import { scopedLogger } from "@hpcc-js/util"; import { SizeMe } from "react-sizeme"; import nlsHPCC from "src/nlsHPCC"; import * as FileSpray from "src/FileSpray"; +import { formatCost } from "src/Session"; import { useConfirm } from "../hooks/confirm"; import { useDfuWorkunit } from "../hooks/workunit"; import { pivotItemStyle } from "../layouts/pivot"; @@ -213,6 +214,7 @@ export const DFUWorkunitDetails: React.FunctionComponent = (props if (editor) { if (theme.semanticColors.link === darkTheme.palette.themePrimary) { - editor.setOption("theme", "darcula"); + editor.option("theme", "darcula"); } else { - editor.setOption("theme", "default"); + editor.option("theme", "default"); } } }, [wuid, editor, theme]); @@ -404,9 +404,9 @@ export const ECLPlayground: React.FunctionComponent = (props const handleThemeToggle = React.useCallback((evt) => { if (!editor) return; if (evt.detail && evt.detail.dark === true) { - editor.setOption("theme", "darcula"); + editor.option("theme", "darcula"); } else { - editor.setOption("theme", "default"); + editor.option("theme", "default"); } }, [editor]); useOnEvent(document, "eclwatch-theme-toggle", handleThemeToggle); diff --git a/esp/src/src-react/components/IndexFileSummary.tsx b/esp/src/src-react/components/IndexFileSummary.tsx index da78d4fc6ed..54f52605e97 100644 --- a/esp/src/src-react/components/IndexFileSummary.tsx +++ b/esp/src/src-react/components/IndexFileSummary.tsx @@ -1,7 +1,6 @@ import * as React from "react"; import { CommandBar, ContextualMenuItemType, ICommandBarItemProps, ScrollablePane, ScrollbarVisibility, Sticky, StickyPositionType } from "@fluentui/react"; import { DFUService, WsDfu } from "@hpcc-js/comms"; -import { format as d3Format } from "@hpcc-js/common"; import { scopedLogger } from "@hpcc-js/util"; import nlsHPCC from "src/nlsHPCC"; import { formatCost } from "src/Session"; @@ -21,8 +20,6 @@ const logger = scopedLogger("src-react/components/IndexFileSummary.tsx"); import "react-reflex/styles.css"; -const format = d3Format(",.2f"); - const dfuService = new DFUService({ baseUrl: "" }); interface IndexFileSummaryProps { @@ -175,7 +172,7 @@ export const IndexFileSummary: React.FunctionComponent = "Filesize": { label: nlsHPCC.FileSize, type: "string", value: file?.Filesize, readonly: true }, "Format": { label: nlsHPCC.Format, type: "string", value: file?.Format, readonly: true }, "IsCompressed": { label: nlsHPCC.IsCompressed, type: "checkbox", value: file?.IsCompressed, readonly: true }, - "CompressedFileSizeString": { label: nlsHPCC.CompressedFileSize, type: "string", value: file?.CompressedFileSize ? file?.CompressedFileSize.toString() : "", readonly: true }, + "CompressedFileSizeString": { label: nlsHPCC.CompressedFileSize, type: "string", value: file?.CompressedFileSize ? Utility.safeFormatNum(file?.CompressedFileSize) : "", readonly: true }, "PercentCompressed": { label: nlsHPCC.PercentCompressed, type: "string", value: file?.PercentCompressed, readonly: true }, "Modified": { label: nlsHPCC.Modified, type: "string", value: file?.Modified, readonly: true }, "ExpirationDate": { label: nlsHPCC.ExpirationDate, type: "string", value: file?.ExpirationDate, readonly: true }, @@ -237,21 +234,21 @@ export const IndexFileSummary: React.FunctionComponent = label: nlsHPCC.File, originalSize: Utility.convertedSize(file?.FileSizeInt64), diskSize: Utility.convertedSize(file?.CompressedFileSize || file?.FileSizeInt64), - percentCompressed: ((file?.CompressedFileSize && file?.FileSizeInt64) ? format(100 * file?.CompressedFileSize / file?.FileSizeInt64) : 0) + "%", + percentCompressed: ((file?.CompressedFileSize && file?.FileSizeInt64) ? Utility.formatDecimal(100 * file?.CompressedFileSize / file?.FileSizeInt64) : 0) + "%", memorySize: (file?.ExtendedIndexInfo?.SizeMemoryBranches && file?.ExtendedIndexInfo?.SizeMemoryLeaves) ? Utility.convertedSize(file?.ExtendedIndexInfo?.SizeMemoryBranches + file?.ExtendedIndexInfo?.SizeMemoryLeaves) : "" }, { label: nlsHPCC.Branches, originalSize: Utility.convertedSize(file?.ExtendedIndexInfo?.SizeOriginalBranches) ?? "", diskSize: Utility.convertedSize(file?.ExtendedIndexInfo?.SizeDiskBranches) ?? "", - percentCompressed: file?.ExtendedIndexInfo?.BranchCompressionPercent ? format(file.ExtendedIndexInfo.BranchCompressionPercent) + "%" : "", + percentCompressed: file?.ExtendedIndexInfo?.BranchCompressionPercent ? Utility.formatDecimal(file.ExtendedIndexInfo.BranchCompressionPercent) + "%" : "", memorySize: Utility.convertedSize(file?.ExtendedIndexInfo?.SizeMemoryBranches) ?? "" }, { label: nlsHPCC.Data, originalSize: Utility.convertedSize(file?.ExtendedIndexInfo?.SizeOriginalData) ?? "", diskSize: (file?.ExtendedIndexInfo?.SizeDiskLeaves !== undefined && file?.ExtendedIndexInfo?.SizeDiskBlobs !== undefined) ? Utility.convertedSize(file?.ExtendedIndexInfo?.SizeDiskLeaves + file?.ExtendedIndexInfo?.SizeDiskBlobs) : "", - percentCompressed: file?.ExtendedIndexInfo?.DataCompressionPercent ? format(file.ExtendedIndexInfo.DataCompressionPercent) + "%" : "", + percentCompressed: file?.ExtendedIndexInfo?.DataCompressionPercent ? Utility.formatDecimal(file.ExtendedIndexInfo.DataCompressionPercent) + "%" : "", memorySize: Utility.convertedSize(file?.ExtendedIndexInfo?.SizeMemoryLeaves) ?? "" } ]} diff --git a/esp/src/src-react/components/LogicalFileSummary.tsx b/esp/src/src-react/components/LogicalFileSummary.tsx index 0416e253424..469f2c57b83 100644 --- a/esp/src/src-react/components/LogicalFileSummary.tsx +++ b/esp/src/src-react/components/LogicalFileSummary.tsx @@ -1,6 +1,5 @@ import * as React from "react"; import { CommandBar, ContextualMenuItemType, ICommandBarItemProps, MessageBar, MessageBarType, ScrollablePane, ScrollbarVisibility, Sticky, StickyPositionType } from "@fluentui/react"; -import { format as d3Format } from "@hpcc-js/common"; import { DFUService, WsDfu } from "@hpcc-js/comms"; import { scopedLogger } from "@hpcc-js/util"; import nlsHPCC from "src/nlsHPCC"; @@ -29,8 +28,6 @@ interface LogicalFileSummaryProps { tab?: string; } -const formatInt = d3Format(","); - export const LogicalFileSummary: React.FunctionComponent = ({ cluster, logicalFile, @@ -196,7 +193,7 @@ export const LogicalFileSummary: React.FunctionComponent = ({ }); }, [lineage, selectedLineage]); - const graphComponent = React.useMemo(() => { - return - - setSelectedLineage(lineage.find(l => l.id === item.id))} /> - } - main={<> - - - - - - } - />; - }, [graphButtons, graphRightButtons, breadcrumbs, selectedLineage?.id, spinnerLabel, selectedMetrics.length, metricGraphWidget, lineage]); - // Props Table --- const propsTable = useConst(() => new Table() - .id("propsTable") .columns([nlsHPCC.Property, nlsHPCC.Value, "Avg", "Min", "Max", "Delta", "StdDev", "SkewMin", "SkewMax", "NodeMin", "NodeMax"]) .columnWidth("auto") ); @@ -485,7 +464,6 @@ export const Metrics: React.FunctionComponent = ({ }, [propsTable]); const propsTable2 = useConst(() => new Table() - .id("propsTable2") .columns([nlsHPCC.Property, nlsHPCC.Value]) .columnWidth("auto") ); @@ -514,14 +492,6 @@ export const Metrics: React.FunctionComponent = ({ ; }, [propsTable2]); - const portal = useConst(() => new ReactWidget() - .id("portal") - ); - - React.useEffect(() => { - portal.children(

{timelineFilter}

).lazyRender(); - }, [portal, timelineFilter]); - React.useEffect(() => { const dot = metricGraph.graphTpl(selectedLineage ? [selectedLineage] : [], options); setDot(dot); @@ -561,38 +531,6 @@ export const Metrics: React.FunctionComponent = ({ setSelectedMetricsPtr(0); }, [metrics, selection]); - const items: DockPanelItems = React.useMemo((): DockPanelItems => { - return [ - { - key: "scopesTable", - title: nlsHPCC.Metrics, - component: } - main={} - /> - }, - { - key: "metricGraph", - title: nlsHPCC.Graph, - component: graphComponent, - location: "split-right", - ref: "scopesTable" - }, - { - title: nlsHPCC.Properties, - widget: propsTable, - location: "split-bottom", - ref: "scopesTable" - }, - { - title: nlsHPCC.CrossTab, - widget: propsTable2, - location: "tab-after", - ref: propsTable.id() - } - ]; - }, [scopeFilter, onChangeScopeFilter, scopesTable, graphComponent, propsTable, propsTable2]); - React.useEffect(() => { // Update layout prior to unmount --- @@ -685,7 +623,39 @@ export const Metrics: React.FunctionComponent = ({ } main={ - + + + } + main={} + /> + + + + + setSelectedLineage(lineage.find(l => l.id === item.id))} /> + } + main={<> + + + + + + } + /> + + + + + + + + } diff --git a/esp/src/src-react/components/Scopes.tsx b/esp/src/src-react/components/Scopes.tsx index 5c89c66e0b0..b03b33789b7 100644 --- a/esp/src/src-react/components/Scopes.tsx +++ b/esp/src/src-react/components/Scopes.tsx @@ -7,7 +7,6 @@ import { formatCost } from "src/Session"; import * as Utility from "src/Utility"; import nlsHPCC from "src/nlsHPCC"; import { useConfirm } from "../hooks/confirm"; -import { useBuildInfo } from "../hooks/platform"; import { useUserTheme } from "../hooks/theme"; import { useMyAccount } from "../hooks/user"; import { HolyGrail } from "../layouts/HolyGrail"; @@ -101,7 +100,6 @@ export const Scopes: React.FunctionComponent = ({ const { currentUser } = useMyAccount(); const [viewByScope, setViewByScope] = React.useState(true); const [uiState, setUIState] = React.useState({ ...defaultUIState }); - const [, { currencyCode }] = useBuildInfo(); const { selection, setSelection, setTotal, @@ -197,14 +195,14 @@ export const Scopes: React.FunctionComponent = ({ Modified: { label: nlsHPCC.ModifiedUTCGMT, width: 162 }, AtRestCost: { label: nlsHPCC.FileCostAtRest, width: 100, - formatter: (cost, row) => `${formatCost(cost ?? 0)} (${currencyCode || "$"})` + formatter: (cost, row) => `${formatCost(cost ?? 0)}` }, AccessCost: { label: nlsHPCC.FileAccessCost, width: 100, - formatter: (cost, row) => `${formatCost(cost ?? 0)} (${currencyCode || "$"})` + formatter: (cost, row) => `${formatCost(cost ?? 0)}` } }; - }, [currencyCode, scopePath]); + }, [scopePath]); const [DeleteConfirm, setShowDeleteConfirm] = useConfirm({ title: nlsHPCC.Delete, diff --git a/esp/src/src-react/components/SourceEditor.tsx b/esp/src/src-react/components/SourceEditor.tsx index e2a4060741d..6a367c39c71 100644 --- a/esp/src/src-react/components/SourceEditor.tsx +++ b/esp/src/src-react/components/SourceEditor.tsx @@ -76,9 +76,9 @@ export const SourceEditor: React.FunctionComponent = ({ const handleThemeToggle = React.useCallback((evt) => { if (!editor) return; if (evt.detail && evt.detail.dark === true) { - editor.setOption("theme", "darcula"); + editor.option("theme", "darcula"); } else { - editor.setOption("theme", "default"); + editor.option("theme", "default"); } }, [editor]); useOnEvent(document, "eclwatch-theme-toggle", handleThemeToggle); diff --git a/esp/src/src-react/components/SourceFiles.tsx b/esp/src/src-react/components/SourceFiles.tsx index 7d1e7cc70de..e33a53c3019 100644 --- a/esp/src/src-react/components/SourceFiles.tsx +++ b/esp/src/src-react/components/SourceFiles.tsx @@ -54,10 +54,14 @@ export const SourceFiles: React.FunctionComponent = ({ Name: { label: "Name", sortable: true, formatter: (Name, row) => { + let fileUrl = `#/files/${Name}`; + if (row?.FileCluster) { + fileUrl = `#/files/${row.FileCluster}/${Name}`; + } return <>   - {Name} + {Name} ; } }, @@ -86,10 +90,18 @@ export const SourceFiles: React.FunctionComponent = ({ key: "open", text: nlsHPCC.Open, disabled: !uiState.hasSelection, iconProps: { iconName: "WindowEdit" }, onClick: () => { if (selection.length === 1) { - window.location.href = `#/files/${selection[0].Name}`; + let fileUrl = `#/files/${selection[0].Name}`; + if (selection[0]?.FileCluster) { + fileUrl = `#/files/${selection[0].FileCluster}/${selection[0].Name}`; + } + window.location.href = fileUrl; } else { for (let i = selection.length - 1; i >= 0; --i) { - window.open(`#/files/${selection[i].Name}`, "_blank"); + let fileUrl = `#/files/${selection[i].Name}`; + if (selection[i]?.FileCluster) { + fileUrl = `#/files/${selection[i].FileCluster}/${selection[i].Name}`; + } + window.open(fileUrl, "_blank"); } } } diff --git a/esp/src/src-react/components/Title.tsx b/esp/src/src-react/components/Title.tsx index 1cd66f3f2a8..ec472bbc373 100644 --- a/esp/src/src-react/components/Title.tsx +++ b/esp/src/src-react/components/Title.tsx @@ -65,6 +65,10 @@ export const DevTitle: React.FunctionComponent = ({ const [showBannerConfig, setShowBannerConfig] = React.useState(false); const [BannerMessageBar, BannerConfig] = useBanner({ showForm: showBannerConfig, setShowForm: setShowBannerConfig }); + const titlebarColorSet = React.useMemo(() => { + return titlebarColor && titlebarColor !== theme.palette.themeLight; + }, [theme.palette, titlebarColor]); + const personaProps: IPersonaSharedProps = React.useMemo(() => { return { text: (currentUser?.firstName && currentUser?.lastName) ? currentUser.firstName + " " + currentUser.lastName : currentUser?.username, @@ -181,12 +185,12 @@ export const DevTitle: React.FunctionComponent = ({ background: "transparent", minWidth: 48, padding: "0 10px 0 4px", - color: theme.semanticColors.link + color: titlebarColor ? Utility.textColor(titlebarColor) : theme.semanticColors.link }, errorsWarningsCount: { margin: "-3px 0 0 -3px" } - }), [theme.semanticColors.link]); + }), [theme.semanticColors.link, titlebarColor]); React.useEffect(() => { switch (log.reduce((prev, cur) => Math.max(prev, cur.level), Level.debug)) { @@ -248,18 +252,18 @@ export const DevTitle: React.FunctionComponent = ({ document.title = environmentTitle; }, [environmentTitle]); - return
+ return
- + - + {showEnvironmentTitle && environmentTitle.length ? environmentTitle : "ECL Watch"} @@ -283,7 +287,7 @@ export const DevTitle: React.FunctionComponent = ({ - + void) { const retVal = super.render(); if (this._origLayout === undefined) { this._origLayout = formatLayout(this.layout()); } + if (callback) { + callback(this); + } return retVal; } @@ -160,36 +144,44 @@ export class ResetableDockPanel extends HPCCDockPanel { } } -export type DockPanelItems = (DockPanelWidget | DockPanelComponent)[]; +interface DockPanelItemProps { + key: string; + title: string; + location?: "split-top" | "split-left" | "split-right" | "split-bottom" | "tab-before" | "tab-after"; + relativeTo?: string; + closable?: boolean | IClosable; + children: JSX.Element; +} + +export const DockPanelItem: React.FunctionComponent = ({ + children +}) => { + return <>{children}; +}; interface DockPanelProps { - items?: DockPanelItems; layout?: object; + hideSingleTabs?: boolean; onDockPanelCreate?: (dockpanel: ResetableDockPanel) => void; + children?: React.ReactElement | React.ReactElement[]; } export const DockPanel: React.FunctionComponent = ({ - items = [], layout, - onDockPanelCreate + hideSingleTabs, + onDockPanelCreate, + children }) => { - + const items = React.useMemo(() => { + if (children === undefined) return []; + return Array.isArray(children) ? children : [children]; + }, [children]); + const [prevItems, setPrevItems] = React.useState[]>([]); const { theme, themeV9 } = useUserTheme(); - const [idx, setIdx] = React.useState<{ [key: string]: Widget }>({}); + const idx = useConst(() => new Map()); const dockPanel = useConst(() => { const retVal = new ResetableDockPanel(); - const idx: { [key: string]: Widget } = {}; - items.forEach(item => { - if (isDockPanelComponent(item)) { - idx[item.key] = new ReactWidget().id(item.key); - retVal.addWidget(idx[item.key], item.title, item.location, idx[item.ref], item.closable); - } else if (item.widget) { - idx[item.widget.id()] = item.widget; - retVal.addWidget(item.widget, item.title, item.location, idx[item.ref], item.closable); - } - }); - setIdx(idx); if (onDockPanelCreate) { setTimeout(() => { onDockPanelCreate(retVal); @@ -198,6 +190,35 @@ export const DockPanel: React.FunctionComponent = ({ return retVal; }); + React.useEffect(() => { + dockPanel?.hideSingleTabs(hideSingleTabs); + }, [dockPanel, hideSingleTabs]); + + React.useEffect(() => { + const diffs = compare2(prevItems, items, item => item.key); + diffs.exit.forEach(item => { + idx.delete(item.key); + dockPanel.removeWidget(idx.get(item.key)); + }); + diffs.enter.forEach(item => { + const reactWidget = new ReactWidget().id(item.key); + dockPanel.addWidget(reactWidget, item.props.title, item.props.location, idx.get(item.props.relativeTo), item.props.closable); + idx.set(item.key, reactWidget); + }); + [...diffs.enter, ...diffs.update].forEach(item => { + const reactWidget = idx.get(item.key); + if (reactWidget) { + reactWidget + .theme(theme) + .themeV9(themeV9) + .children(item.props.children) + ; + } + }); + dockPanel.render(); + setPrevItems(items); + }, [prevItems, dockPanel, idx, items, theme, themeV9]); + React.useEffect(() => { if (layout === undefined) { dockPanel?.resetLayout(); @@ -206,16 +227,5 @@ export const DockPanel: React.FunctionComponent = ({ } }, [dockPanel, layout]); - React.useEffect(() => { - items.filter(isDockPanelComponent).forEach(item => { - (idx[item.key] as ReactWidget) - .theme(theme) - .themeV9(themeV9) - .children(item.component) - .render() - ; - }); - }, [idx, items, theme, themeV9]); - return ; }; diff --git a/esp/src/src/ECLArchiveWidget.ts b/esp/src/src/ECLArchiveWidget.ts index 879efbfa4fd..e1ac7eb193b 100644 --- a/esp/src/src/ECLArchiveWidget.ts +++ b/esp/src/src/ECLArchiveWidget.ts @@ -59,9 +59,9 @@ export class ECLArchiveWidget { setEditorTheme() { if (themeIsDark()) { - this.editor.setOption("theme", "darcula"); + this.editor.option("theme", "darcula"); } else { - this.editor.setOption("theme", "default"); + this.editor.option("theme", "default"); } } @@ -108,9 +108,9 @@ export class ECLArchiveWidget { const handleThemeToggle = (evt) => { if (!context.editor) return; if (evt.detail && evt.detail.dark === true) { - context.editor.setOption("theme", "darcula"); + context.editor.option("theme", "darcula"); } else { - context.editor.setOption("theme", "default"); + context.editor.option("theme", "default"); } }; document.addEventListener("eclwatch-theme-toggle", handleThemeToggle); diff --git a/esp/src/src/Utility.ts b/esp/src/src/Utility.ts index 0c225ad67c0..bc34904d288 100644 --- a/esp/src/src/Utility.ts +++ b/esp/src/src/Utility.ts @@ -1093,7 +1093,7 @@ export function deleteCookie(name: string) { } const d3FormatDecimal = d3Format(",.2f"); -const d3FormatInt = d3Format(","); +const d3FormatInt = d3Format(",.0f"); export function formatDecimal(num: number): string { if (!num) return ""; diff --git a/helm/examples/tracing/README.md b/helm/examples/tracing/README.md index 0c9676891f0..672d27e0dba 100644 --- a/helm/examples/tracing/README.md +++ b/helm/examples/tracing/README.md @@ -11,26 +11,27 @@ All configuration options detailed here are part of the HPCC Systems Helm chart, - disabled - (default: false) disables tracking and reporting of internal traces and spans - alwaysCreateGlobalIds - If true, assign newly created global ID to any requests that do not supply one. - optAlwaysCreateTraceIds - If true components generate trace/span ids if none are provided by the remote caller. -- exporter - Defines The type of exporter in charge of forwarding span data to target back-end - - type - (default: JLOG) "OTLP-HTTP" | "OTLP-GRPC" | "OS" | "JLOG" | "NONE" - - JLOG - - logSpanDetails - Log span details such as description, status, kind - - logParentInfo - Log the span's parent info such as ParentSpanId, and TraceState - - logAttributes - Log the span's attributes - - logEvents - Log the span's events - - logLinks - Log the span's links - - logResources - Log the span's resources such as telemetry.sdk version, name, language - - OTLP-HTTP - - endpoint - (default localhost:4318) Specifies the target OTLP-HTTP backend - - timeOutSecs - (default 10secs) - - consoleDebug - (default false) - - OTLP-GRPC - - endpoint: (default localhost:4317) The endpoint to export to. By default the OpenTelemetry Collector's default endpoint. - - useSslCredentials - By default when false, uses grpc::InsecureChannelCredentials; If true uses sslCredentialsCACertPath - - sslCredentialsCACertPath - Path to .pem file to be used for SSL encryption. - - timeOutSeconds - (default 10secs) Timeout for grpc deadline -- processor - Controls span processing style. One by one as available, or in batches. - - type - (default: simple) "simple" | "batch" +- enableDefaultLogExporter - If true, creates a trace exporter outputting to the log using the default options +- exporters: - Defines a list of exporters in charge of forwarding span data to target back-end + - type - "OTLP-HTTP" | "OTLP-GRPC" | "OS" | "JLOG" + - "JLOG" + - logSpanDetails - Log span details such as description, status, kind + - logParentInfo - Log the span's parent info such as ParentSpanId, and TraceState + - logAttributes - Log the span's attributes + - logEvents - Log the span's events + - logLinks - Log the span's links + - logResources - Log the span's resources such as telemetry.sdk version, name, language + - "OTLP-HTTP" + - endpoint - (default localhost:4318) Specifies the target OTLP-HTTP backend + - timeOutSecs - (default 10secs) + - consoleDebug - (default false) + - "OTLP-GRPC" + - endpoint: (default localhost:4317) The endpoint to export to. By default the OpenTelemetry Collector's default endpoint. + - useSslCredentials - By default when false, uses grpc::InsecureChannelCredentials; If true uses sslCredentialsCACertPath + - sslCredentialsCACertPath - Path to .pem file to be used for SSL encryption. + - timeOutSeconds - (default 10secs) Timeout for grpc deadline + - batch: + - enabled - If true, trace data is processed in a batch, if false, trace data is processed immediately ### Sample configuration Below is a sample helm values block directing the HPCC tracing framework to process span information serially, and export the data over OTLP/HTTP protocol to localhost:4318 and output export debug information to console: @@ -38,11 +39,9 @@ Below is a sample helm values block directing the HPCC tracing framework to proc ```console global: tracing: - exporter: - type: OTLP-HTTP + exporters: + - type: OTLP-HTTP consoleDebug: true - processor: - type: simple ``` ### Sample configuration command diff --git a/helm/examples/tracing/jlog-collector-fulloutput.yaml b/helm/examples/tracing/jlog-collector-fulloutput.yaml index c3ce5704e49..e251f758884 100644 --- a/helm/examples/tracing/jlog-collector-fulloutput.yaml +++ b/helm/examples/tracing/jlog-collector-fulloutput.yaml @@ -1,7 +1,8 @@ global: tracing: - exporter: - type: JLog + enableDefaultLogExporter: false + exporters: + - type: JLOG logSpanDetails: true logParentInfo: true logAttributes: true diff --git a/helm/examples/tracing/otlp-grpc-collector-default.yaml b/helm/examples/tracing/otlp-grpc-collector-default.yaml index 35ea30dfc32..90ca78a56b0 100644 --- a/helm/examples/tracing/otlp-grpc-collector-default.yaml +++ b/helm/examples/tracing/otlp-grpc-collector-default.yaml @@ -1,6 +1,6 @@ global: tracing: - exporter: - type: OTLP-GRPC + exporters: + - type: OTLP-GRPC endpoint: "localhost:4317" useSslCredentials: false diff --git a/helm/examples/tracing/otlp-http-collector-default.yaml b/helm/examples/tracing/otlp-http-collector-default.yaml index 717c0984298..5c442e8552d 100644 --- a/helm/examples/tracing/otlp-http-collector-default.yaml +++ b/helm/examples/tracing/otlp-http-collector-default.yaml @@ -1,6 +1,6 @@ global: tracing: - exporter: - type: OTLP-HTTP + exporters: + - type: OTLP-HTTP endpoint: "localhost:4318" consoleDebug: true \ No newline at end of file diff --git a/helm/hpcc/values.schema.json b/helm/hpcc/values.schema.json index 2edfa39a80e..8b301025c9a 100644 --- a/helm/hpcc/values.schema.json +++ b/helm/hpcc/values.schema.json @@ -1116,29 +1116,41 @@ "type": "boolean", "description": "If true, components generate trace/span ids if none are provided by the remote caller" }, - "exporter": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["OTLP-HTTP", "OTLP-GRPC", "OS", "JLOG", "NONE"], - "description": "The type of exporter in charge of forwarding span data to target back-end" - } - } + "enableDefaultLogExporter": { + "type": "boolean", + "description": "If true, creates a trace exporter outputting to the log using the default options" }, - "processor": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["batch", "simple"], - "description": "Defines the manner in which trace data is processed - in batches, or simple as available" - } + "exporters": { + "type": "array", + "description": "List of trace exporters", + "items": { + "$ref": "#/definitions/traceExporter" } } }, "additionalProperties": { "type": ["integer", "string", "boolean"] } }, + "traceExporter": { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": ["OTLP-HTTP", "OTLP-GRPC", "OS", "JLOG"], + "description": "The type of exporter in charge of forwarding span data to target back-end" + }, + "batch": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "description": "If true, trace data is processed in a batch, if false, trace data is processed immediately" + } + }, + "additionalProperties": { "type": ["integer", "string", "boolean"] } + }, + "additionalProperties": { "type": ["integer", "string", "boolean"] } + } + }, "compileOption": { "type": "object", "properties": { diff --git a/plugins/javaembed/javaembed.cpp b/plugins/javaembed/javaembed.cpp index cde57fef00f..575be35f52c 100644 --- a/plugins/javaembed/javaembed.cpp +++ b/plugins/javaembed/javaembed.cpp @@ -966,8 +966,10 @@ static class JavaGlobalState // Options we know we always want set optionStrings.append("-Xrs"); + #ifdef RLIMIT_STACK // JVM has a habit of reducing the stack limit on main thread to 1M - probably dates back to when it was actually an increase... + // is this different than -XX:ThreadStackSize ? StringBuffer stackOption("-Xss"); struct rlimit limit; rlim_t slim = 0; @@ -977,6 +979,9 @@ static class JavaGlobalState slim = 8*1024*1024; if (slim >= 1*1024*1024) { + // 500m max resonable ? + if (slim >= 0x1f400000) + slim = 0x1f400000; stackOption.append((__uint64) slim); optionStrings.append(stackOption); } diff --git a/plugins/py3embed/py3embed.cpp b/plugins/py3embed/py3embed.cpp index 14220d0a352..f76dd95f2aa 100644 --- a/plugins/py3embed/py3embed.cpp +++ b/plugins/py3embed/py3embed.cpp @@ -44,6 +44,11 @@ #include "enginecontext.hpp" #include +#if defined (__linux__) || defined(__FreeBSD__) || defined(__APPLE__) +#include // comment out if not present +#define HAS_BACKTRACE +#endif + #if PY_MAJOR_VERSION >=3 #define Py_TPFLAGS_HAVE_ITER 0 #endif @@ -131,6 +136,14 @@ static void failx(const char *message, ...) StringBuffer msg; msg.append("pyembed: ").valist_appendf(message,args); va_end(args); +#ifdef HAS_BACKTRACE + void *stack[5]; + unsigned nFrames = backtrace(stack, 5); + char** strs = backtrace_symbols(stack, nFrames); + for (unsigned i = 0; i < nFrames; ++i) + msg.append("\n ").append(strs[i]); + free(strs); +#endif rtlFail(0, msg.str()); } @@ -383,6 +396,9 @@ static bool releaseContext(bool isPooled) return false; } +// GILUnblock ensures the we release the Python "Global interpreter lock" for the appropriate duration + + // Use a global object to ensure that the Python interpreter is initialized on main thread static HINSTANCE keepLoadedHandle; diff --git a/roxie/ccd/ccddebug.cpp b/roxie/ccd/ccddebug.cpp index b2a3fb3ce98..e5145668aac 100644 --- a/roxie/ccd/ccddebug.cpp +++ b/roxie/ccd/ccddebug.cpp @@ -1214,7 +1214,7 @@ class CProxyDebugContext : public CInterface if (mr) { unsigned roxieHeaderLen; - const RoxiePacketHeader *header = (const RoxiePacketHeader *) mr->getMessageHeader(roxieHeaderLen); + const RoxiePacketHeader *header = mr->getMessageHeader(roxieHeaderLen); Owned mu = mr->getCursor(rowManager); if (header->activityId == ROXIE_EXCEPTION) throwRemoteException(mu); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index a8325916f2e..2298de6e5c3 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -3352,11 +3352,11 @@ class CLocalMessageResult : implements IMessageResult, public CInterface { void *data; void *meta; - void *header; + RoxiePacketHeader *header; unsigned datalen, metalen, headerlen; public: IMPLEMENT_IINTERFACE; - CLocalMessageResult(void *_data, unsigned _datalen, void *_meta, unsigned _metalen, void *_header, unsigned _headerlen) + CLocalMessageResult(void *_data, unsigned _datalen, void *_meta, unsigned _metalen, RoxiePacketHeader *_header, unsigned _headerlen) { datalen = _datalen; metalen = _metalen; @@ -3378,7 +3378,7 @@ class CLocalMessageResult : implements IMessageResult, public CInterface return new CLocalMessageUnpackCursor(rowMgr, data, datalen); } - virtual const void *getMessageHeader(unsigned &length) const + virtual const RoxiePacketHeader *getMessageHeader(unsigned &length) const { length = headerlen; return header; @@ -3400,11 +3400,11 @@ class CLocalBlockedMessageResult : implements IMessageResult, public CInterface { ArrayOf buffers; void *meta; - void *header; + RoxiePacketHeader *header; unsigned metalen, headerlen; public: IMPLEMENT_IINTERFACE; - CLocalBlockedMessageResult(ArrayOf &_buffers, void *_meta, unsigned _metalen, void *_header, unsigned _headerlen) + CLocalBlockedMessageResult(ArrayOf &_buffers, void *_meta, unsigned _metalen, RoxiePacketHeader *_header, unsigned _headerlen) { buffers.swapWith(_buffers); metalen = _metalen; @@ -3424,7 +3424,7 @@ class CLocalBlockedMessageResult : implements IMessageResult, public CInterface return new CLocalBlockedMessageUnpackCursor(rowMgr, buffers); } - virtual const void *getMessageHeader(unsigned &length) const + virtual const RoxiePacketHeader *getMessageHeader(unsigned &length) const { length = headerlen; return header; @@ -3574,7 +3574,7 @@ void LocalMessagePacker::flush() unsigned datalen = data.length(); unsigned metalen = meta.length(); unsigned headerlen = header.length(); - collator->enqueueMessage(outOfBand, datalen+metalen+headerlen, new CLocalMessageResult(data.detach(), datalen, meta.detach(), metalen, header.detach(), headerlen)); + collator->enqueueMessage(outOfBand, datalen+metalen+headerlen, new CLocalMessageResult(data.detach(), datalen, meta.detach(), metalen, (RoxiePacketHeader *) header.detach(), headerlen)); } // otherwise Roxie server is no longer interested and we can simply discard } @@ -3594,7 +3594,7 @@ void LocalBlockedMessagePacker::flush() unsigned headerlen = header.length(); // NOTE - takes ownership of buffers and leaves it empty if (collator->attachDataBuffers(buffers)) - collator->enqueueMessage(outOfBand, totalDataLen+metalen+headerlen, new CLocalBlockedMessageResult(buffers, meta.detach(), metalen, header.detach(), headerlen)); + collator->enqueueMessage(outOfBand, totalDataLen+metalen+headerlen, new CLocalBlockedMessageResult(buffers, meta.detach(), metalen, (RoxiePacketHeader *) header.detach(), headerlen)); } // otherwise Roxie server is no longer interested and we can simply discard } @@ -3926,7 +3926,7 @@ class PingTimer : public Thread if (mr) { unsigned headerLen; - const RoxiePacketHeader *header = (const RoxiePacketHeader *) mr->getMessageHeader(headerLen); + const RoxiePacketHeader *header = mr->getMessageHeader(headerLen); Owned mu = mr->getCursor(rowManager); PingRecord *answer = (PingRecord *) mu->getNext(sizeof(PingRecord)); if (answer && mu->atEOF() && headerLen==sizeof(RoxiePacketHeader)) diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 77d12a58604..b88fc7e5973 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -3575,7 +3575,7 @@ class CRowArrayMessageResult : implements IMessageResult, public CInterface return new CRowArrayMessageUnpackCursor(_this->data, _this); } - virtual const void *getMessageHeader(unsigned &length) const + virtual const RoxiePacketHeader *getMessageHeader(unsigned &length) const { throwUnexpected(); // should never get called - I don't have a header available } @@ -5026,7 +5026,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie if (mr) { unsigned roxieHeaderLen; - const RoxiePacketHeader &header = *(const RoxiePacketHeader *) mr->getMessageHeader(roxieHeaderLen); + const RoxiePacketHeader &header = * mr->getMessageHeader(roxieHeaderLen); #ifdef _DEBUG assertex(roxieHeaderLen == sizeof(RoxiePacketHeader)); #endif @@ -23908,7 +23908,7 @@ class CRoxieServerIndexReadActivity : public CRoxieServerIndexReadBaseActivity, Link(); return const_cast (this); } - virtual const void *getMessageHeader(unsigned &length) const + virtual const RoxiePacketHeader *getMessageHeader(unsigned &length) const { length = 0; return NULL; diff --git a/roxie/udplib/udplib.hpp b/roxie/udplib/udplib.hpp index 79c90c534f9..21c87f579c7 100644 --- a/roxie/udplib/udplib.hpp +++ b/roxie/udplib/udplib.hpp @@ -102,6 +102,7 @@ interface IMessagePacker : extends IInterface }; interface IException; +class RoxiePacketHeader; interface IMessageUnpackCursor : extends IInterface { @@ -116,7 +117,7 @@ interface IMessageUnpackCursor : extends IInterface interface IMessageResult : extends IInterface { virtual IMessageUnpackCursor *getCursor(roxiemem::IRowManager *rowMgr) const = 0; - virtual const void *getMessageHeader(unsigned &length) const = 0; + virtual const RoxiePacketHeader *getMessageHeader(unsigned &length) const = 0; virtual const void *getMessageMetadata(unsigned &length) const = 0; virtual void discard() const = 0; }; diff --git a/roxie/udplib/udpmsgpk.cpp b/roxie/udplib/udpmsgpk.cpp index c2cfff81d18..a20f29d17c4 100644 --- a/roxie/udplib/udpmsgpk.cpp +++ b/roxie/udplib/udpmsgpk.cpp @@ -37,6 +37,7 @@ #include "udpmsgpk.hpp" #include "roxiemem.hpp" #include "roxie.hpp" +#include "ccd.hpp" using roxiemem::DataBuffer; using roxiemem::IRowManager; @@ -65,9 +66,10 @@ class PackageSequencer : public CInterface, implements IInterface DataBuffer *tail = nullptr; unsigned metaSize; unsigned headerSize; - const void *header; + const RoxiePacketHeader *header; unsigned maxSeqSeen = 0; unsigned numPackets = 0; + bool outOfBand = false; #ifdef _DEBUG unsigned scans = 0; unsigned overscans = 0; @@ -228,7 +230,8 @@ class PackageSequencer : public CInterface, implements IInterface // MORE - Is this safe - header lifetime is somewhat unpredictable without a copy of it... // Client header is at the start of packet 0 headerSize = *(unsigned short *)(finger->data + sizeof(UdpPacketHeader)); - header = finger->data + sizeof(UdpPacketHeader) + sizeof(unsigned short); + header = (const RoxiePacketHeader *) (finger->data + sizeof(UdpPacketHeader) + sizeof(unsigned short)); + outOfBand = (header->overflowSequence & OUTOFBAND_SEQUENCE) != 0; packetDataSize -= headerSize + sizeof(unsigned short); } if (fingerHdr->metalength) @@ -261,15 +264,18 @@ class PackageSequencer : public CInterface, implements IInterface return metadata.toByteArray(); } - inline const void *getMessageHeader() + inline const RoxiePacketHeader *getMessageHeader() { return header; } - inline unsigned getHeaderSize() { return headerSize; } + inline bool isOutOfBand() + { + return outOfBand; + } }; @@ -437,7 +443,7 @@ class CMessageResult : public IMessageResult, CInterface { return new CMessageUnpackCursor(LINK(pkSequencer), rowMgr); } - virtual const void *getMessageHeader(unsigned &length) const + virtual const RoxiePacketHeader *getMessageHeader(unsigned &length) const override { length = pkSequencer->getHeaderSize(); return pkSequencer->getMessageHeader(); @@ -480,7 +486,7 @@ CMessageCollator::~CMessageCollator() while (!queue.empty()) { PackageSequencer *pkSqncr = queue.front(); - queue.pop(); + queue.pop_front(); pkSqncr->Release(); } } @@ -579,7 +585,10 @@ void CMessageCollator::collate(DataBuffer *dataBuff) mapping.remove(puid); } queueCrit.enter(); - queue.push(pkSqncr); + if (pkSqncr->isOutOfBand()) + queue.push_front(pkSqncr); + else + queue.push_back(pkSqncr); queueCrit.leave(); sem.signal(); } @@ -606,7 +615,7 @@ IMessageResult *CMessageCollator::getNextResult(unsigned time_out, bool &anyActi { queueCrit.enter(); PackageSequencer *pkSqncr = queue.front(); - queue.pop(); + queue.pop_front(); queueCrit.leave(); anyActivity = true; activity = false; diff --git a/roxie/udplib/udpmsgpk.hpp b/roxie/udplib/udpmsgpk.hpp index 10f86dfeebb..5ab5c501231 100644 --- a/roxie/udplib/udpmsgpk.hpp +++ b/roxie/udplib/udpmsgpk.hpp @@ -30,7 +30,7 @@ class CMessageCollator : public CInterfaceOf { private: Linked rowMgr; // Must be placed first to ensure it is destroyed last - std::queue queue; + std::deque queue; msg_map mapping; // Note - only accessed from collator thread RelaxedAtomic activity; bool memLimitExceeded; diff --git a/roxie/udplib/udptransport.cmake b/roxie/udplib/udptransport.cmake index 3914e01e666..9f1e2f629f8 100644 --- a/roxie/udplib/udptransport.cmake +++ b/roxie/udplib/udptransport.cmake @@ -34,6 +34,11 @@ include_directories ( ./../../system/include ./../../system/jlib ./../../roxie/ccd + ./../../roxie/udplib + ./../../common/thorhelper + ./../../common/workunit + ./../../rtl/include + ./../../rtl/eclrtl ) ADD_DEFINITIONS ( -D_CONSOLE ) diff --git a/roxie/udplib/udptrr.cpp b/roxie/udplib/udptrr.cpp index 49271868a9f..e4598a84974 100644 --- a/roxie/udplib/udptrr.cpp +++ b/roxie/udplib/udptrr.cpp @@ -643,7 +643,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface { // It's a duplicate request-to-send - either they lost the request_received, or the ok_to_send (which has timed out) // whichever is the case we should resend the acknowledgement to prevent the sender flooding us with requests - if (udpTraceLevel || udpTraceFlow) + if (udpTraceLevel >= 2 || udpTraceFlow) { StringBuffer s; DBGLOG("UdpFlow: Duplicate requestToSend %" SEQF "u from node %s", _flowSeq, dest.getHostText(s).str()); diff --git a/roxie/udplib/uttest.cpp b/roxie/udplib/uttest.cpp index 1ca82b49a4a..87a4d07a9b4 100644 --- a/roxie/udplib/uttest.cpp +++ b/roxie/udplib/uttest.cpp @@ -18,6 +18,7 @@ ///* simple test #include "udplib.hpp" #include "roxiemem.hpp" +#include "ccd.hpp" //#include "udptrr.hpp" //#include "udptrs.hpp" @@ -89,7 +90,7 @@ bool readRows = true; IpAddressArray allNodes; -struct TestHeader +struct TestHeader : public RoxiePacketHeader { unsigned sequence; unsigned nodeIndex; @@ -246,7 +247,7 @@ class Receiver : public Thread break; } else - UNIMPLEMENTED; + throwUnexpected(); } } } @@ -324,7 +325,9 @@ void testNxN() while (dontSendToSelf&&(dest==myIndex)); if (!packers[dest]) { - TestHeader t = {sequences[dest], myIndex}; + TestHeader t; + t.sequence = sequences[dest]; + t.nodeIndex = myIndex; ServerIdentifier destServer; destServer.setIp(allNodes.item(dest)); packers[dest] = sendMgr->createMessagePacker(1, sequences[dest], &t, sizeof(t), destServer, 0); diff --git a/rtl/eclrtl/eclhelper_base.cpp b/rtl/eclrtl/eclhelper_base.cpp index d9007b54a28..fac7194d86b 100644 --- a/rtl/eclrtl/eclhelper_base.cpp +++ b/rtl/eclrtl/eclhelper_base.cpp @@ -619,7 +619,7 @@ void CThorSoapActionArg::getLogTailText(size32_t & lenText, char * & text, const const char * CThorSoapActionArg::getXpathHintsXml() { return nullptr;} const char * CThorSoapActionArg::getRequestHeader() { return nullptr; } const char * CThorSoapActionArg::getRequestFooter() { return nullptr; } -unsigned CThorSoapActionArg::getPersistPoolSize() { return 0; } +unsigned CThorSoapActionArg::getPersistMaxRequests() { return 0; } //CThorSoapCallArg @@ -645,7 +645,7 @@ const char * CThorSoapCallArg::getInputIteratorPath() { return NULL; } const char * CThorSoapCallArg::getXpathHintsXml() { return nullptr; } const char * CThorSoapCallArg::getRequestHeader() { return nullptr; } const char * CThorSoapCallArg::getRequestFooter() { return nullptr; } -unsigned CThorSoapCallArg::getPersistPoolSize() { return 0; } +unsigned CThorSoapCallArg::getPersistMaxRequests() { return 0; } size32_t CThorSoapCallArg::onFailTransform(ARowBuilder & rowBuilder, const void * left, IException * e) { return 0; } void CThorSoapCallArg::getLogText(size32_t & lenText, char * & text, const void * left) { lenText =0; text = NULL; } diff --git a/rtl/include/eclhelper.hpp b/rtl/include/eclhelper.hpp index 621d55d0a92..aaee4136d36 100644 --- a/rtl/include/eclhelper.hpp +++ b/rtl/include/eclhelper.hpp @@ -2234,7 +2234,7 @@ enum SOAPFlogusertail = 0x020000, SOAPFformEncoded = 0x040000, SOAPFpersist = 0x080000, - SOAPFpersistPool = 0x100000, + SOAPFpersistMax = 0x100000, }; struct IHThorWebServiceCallActionArg : public IHThorArg @@ -2269,7 +2269,7 @@ struct IHThorWebServiceCallActionArg : public IHThorArg virtual const char * getRequestHeader() = 0; virtual const char * getRequestFooter() = 0; virtual void getLogTailText(size32_t & lenText, char * & text, const void * left) = 0; // iff SOAPFlogusertail set - virtual unsigned getPersistPoolSize() = 0; // only available iff SOAPFpersistPool + virtual unsigned getPersistMaxRequests() = 0; // only available iff SOAPFpersistMax }; typedef IHThorWebServiceCallActionArg IHThorSoapActionArg ; typedef IHThorWebServiceCallActionArg IHThorHttpActionArg ; diff --git a/rtl/include/eclhelper_base.hpp b/rtl/include/eclhelper_base.hpp index d34c664d013..0a0ef1c5e08 100644 --- a/rtl/include/eclhelper_base.hpp +++ b/rtl/include/eclhelper_base.hpp @@ -822,7 +822,7 @@ class ECLRTL_API CThorSoapActionArg : public CThorSinkArgOf virtual const char * getXpathHintsXml() override; virtual const char * getRequestHeader() override; virtual const char * getRequestFooter() override; - virtual unsigned getPersistPoolSize() override; + virtual unsigned getPersistMaxRequests() override; }; class ECLRTL_API CThorSoapCallArg : public CThorArgOf @@ -853,7 +853,7 @@ class ECLRTL_API CThorSoapCallArg : public CThorArgOf virtual const char * getXpathHintsXml() override; virtual const char * getRequestHeader() override; virtual const char * getRequestFooter() override; - virtual unsigned getPersistPoolSize() override; + virtual unsigned getPersistMaxRequests() override; }; typedef CThorSoapCallArg CThorHttpCallArg; diff --git a/system/jlib/jfile.cpp b/system/jlib/jfile.cpp index cc87f6afd0e..d0496dc84df 100644 --- a/system/jlib/jfile.cpp +++ b/system/jlib/jfile.cpp @@ -3264,7 +3264,8 @@ void doCopyFile(IFile * target, IFile * source, size32_t buffersize, ICopyFilePr // try to delete partial copy StringBuffer s; s.append("copyFile target=").append(dest->queryFilename()).append(" source=").append(source->queryFilename()).appendf("; read/write failure (%d): ",e->errorCode()); - exc.setown(MakeStringException(e->errorCode(), "%s", s.str())); + e->errorMessage(s); + exc.setown(makeStringException(e->errorCode(), s.str())); e->Release(); EXCLOG(exc, "doCopyFile"); } diff --git a/system/jlib/jio.cpp b/system/jlib/jio.cpp index a4ae5a4ef95..649e092a7e5 100644 --- a/system/jlib/jio.cpp +++ b/system/jlib/jio.cpp @@ -38,8 +38,6 @@ #define MAX_RANDOM_CACHE_SIZE 0x10000 #define RANDOM_CACHE_DEPTH 10 -constexpr unsigned timelimit=100; - #define MINCOMPRESSEDROWSIZE 16 #define MAXCOMPRESSEDROWSIZE 0x4000 @@ -678,253 +676,6 @@ extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delt return new CDeltaRecordSize(size, delta); } -//================================================================================================== - -// Elevator scanning -#define MAX_PENDING 20000 - -class ElevatorScanner; - -class PendingFetch : public IInterface, public CInterface -{ -public: - IMPLEMENT_IINTERFACE; - - static int compare(const void *a, const void *b); - - offset_t pos; - IReceiver *receiver; - void *target; - IRecordFetchChannel *channel; -}; - -class ElevatorChannel : implements IRecordFetchChannel, public CInterface -{ -private: - bool cancelled; - bool immediate; - ElevatorScanner &scanner; -public: - IMPLEMENT_IINTERFACE; - - ElevatorChannel(ElevatorScanner &, bool); - ~ElevatorChannel(); - -//Interface IRecordFetchChannel - virtual void fetch(offset_t pos, void *buffer, IReceiver *receiver); - virtual void flush(); - virtual void abort() { cancelled = true; } - virtual bool isAborted() { return cancelled; } - virtual bool isImmediate() { return immediate; } -}; - -class ElevatorScanner : public Thread, public IRecordFetcher -{ -private: - Monitor scanlist; - Monitor isRoom; - PendingFetch pending[MAX_PENDING]; - unsigned nextSlot; - size32_t recordSize; - int file; - offset_t reads; - unsigned scans; - bool stopped; - unsigned duetime; - - void scan(); - void doFetch(PendingFetch &); - void stop(); - void resetTimer() - { - duetime = msTick()+timelimit; - } - -public: - IMPLEMENT_IINTERFACE_USING(Thread); - virtual void beforeDispose(); - - ElevatorScanner(int file, size32_t recordSize); - ~ElevatorScanner(); - -//Interface IRecordFetcher - virtual IRecordFetchChannel *openChannel(bool immediate) { return new ElevatorChannel(*this, immediate); } - -//Interface Thread - virtual int run(); - - void flush(IRecordFetchChannel *); - void fetch(offset_t, void *, IReceiver *, IRecordFetchChannel *); -}; - -int PendingFetch::compare(const void *a, const void *b) -{ - offset_t aa = ((PendingFetch *) a)->pos; - offset_t bb = ((PendingFetch *) b)->pos; - if (aa > bb) - return 1; - else if (aa == bb) - return 0; - else - return -1; -} - -ElevatorChannel::ElevatorChannel(ElevatorScanner &_scanner, bool _immediate) : scanner(_scanner) -{ - scanner.Link(); - cancelled = false; - immediate = _immediate; -} - -ElevatorChannel::~ElevatorChannel() -{ - flush(); - scanner.Release(); -} - -void ElevatorChannel::fetch(offset_t fpos, void *buffer, IReceiver *receiver) -{ - scanner.fetch(fpos, buffer, receiver, this); -} - -void ElevatorChannel::flush() -{ - scanner.flush(this); -} - -ElevatorScanner::ElevatorScanner(int _file, size32_t _recordSize) : Thread("ElevatorScanner") -{ - file = _file; - recordSize = _recordSize; - nextSlot = 0; - reads = 0; - scans = 0; - stopped = false; - start(); -} - -ElevatorScanner::~ElevatorScanner() -{ - IERRLOG("Elevator scanner statistics: %" I64F "d reads (%" I64F "d bytes), %d scans", reads, reads*recordSize, scans); -} - -void ElevatorScanner::beforeDispose() -{ - stop(); - join(); -} - -void ElevatorScanner::fetch(offset_t fpos, void *buffer, IReceiver *receiver, IRecordFetchChannel *channel) -{ - synchronized procedure(scanlist); - if (channel->isImmediate()) - { - // MORE - atomic seek/read would be preferable! - checked_lseeki64(file, fpos, SEEK_SET); - checked_read(file, buffer, recordSize); - reads++; - if (!receiver->takeRecord(fpos)) - channel->abort(); - return; - } - { - synchronized block(isRoom); - while (nextSlot >= MAX_PENDING) - isRoom.wait(); - } - if (!channel->isAborted()) - { - pending[nextSlot].pos = fpos; - pending[nextSlot].receiver = receiver; - pending[nextSlot].target = buffer; - pending[nextSlot].channel = channel; - - nextSlot++; - resetTimer(); - scanlist.notify(); - } -} - -void ElevatorScanner::doFetch(PendingFetch &next) -{ - if (!next.channel->isAborted()) - { - // MORE - atomic seek/read would be preferable! - checked_lseeki64(file, next.pos, SEEK_SET); - checked_read(file, next.target, recordSize); - reads++; - if (!next.receiver->takeRecord(next.pos)) - next.channel->abort(); - } -} - -void ElevatorScanner::scan() -{ - DBGLOG("Starting elevator scan of %d items", nextSlot); - scans++; - qsort(pending, nextSlot, sizeof(pending[0]), PendingFetch::compare); - for (unsigned i = 0; i < nextSlot; i++) - { - doFetch(pending[i]); - } - nextSlot = 0; - { - synchronized block(isRoom); - isRoom.notify(); - } - DBGLOG("Finished elevator scan"); -} - -void ElevatorScanner::flush(IRecordFetchChannel *) -{ - // MORE - I could just flush what was asked for, but I may as well flush the lot. - synchronized procedure(scanlist); - if (nextSlot) - scan(); -} - -int ElevatorScanner::run() -{ - scanlist.lock(); - for (;;) - { - while (nextSlot= (start+timeout)) + if(now - start >= timeout) return false; try { diff --git a/system/jlib/jmisc.cpp b/system/jlib/jmisc.cpp index f6560ec9967..7f0d55d5e74 100644 --- a/system/jlib/jmisc.cpp +++ b/system/jlib/jmisc.cpp @@ -68,7 +68,6 @@ void _rev(size32_t len, void * _ptr) Mutex printMutex; FILE *logFile; FILE *stdlog = stderr; -HiresTimer logTimer; class CStdLogIntercept: public ILogIntercept { bool nl; diff --git a/system/jlib/jsecrets.cpp b/system/jlib/jsecrets.cpp index 17833f231f2..6f9191c83a2 100644 --- a/system/jlib/jsecrets.cpp +++ b/system/jlib/jsecrets.cpp @@ -1098,6 +1098,9 @@ static IPropertyTree * resolveLocalSecret(const char *category, const char * nam read(io, 0, (size32_t)-1, content); if (!content.length()) continue; + + //Always add a null terminator to data read from a file so that queryProp() can be used on the resultant tree + content.append((byte)0); tree->setPropBin(name, content.length(), content.bufferBase()); } diff --git a/system/jlib/jtrace.cpp b/system/jlib/jtrace.cpp index fcbff591a62..3efa687d0d8 100644 --- a/system/jlib/jtrace.cpp +++ b/system/jlib/jtrace.cpp @@ -459,6 +459,8 @@ class CTraceManager : implements ITraceManager, public CInterface void initTracerProviderAndGlobalInternals(const IPropertyTree * traceConfig); void initTracer(const IPropertyTree * traceConfig); void cleanupTracer(); + std::unique_ptr createExporter(const IPropertyTree * exportConfig); + std::unique_ptr createProcessor(const IPropertyTree * exportConfig); public: CTraceManager(const char * componentName, const IPropertyTree * componentConfig, const IPropertyTree * globalConfig); @@ -1066,161 +1068,163 @@ IProperties * getSpanContext(const ISpan * span) //--------------------------------------------------------------------------------------------------------------------- -void CTraceManager::initTracerProviderAndGlobalInternals(const IPropertyTree * traceConfig) +std::unique_ptr CTraceManager::createExporter(const IPropertyTree * exportConfig) { - //Trace to JLog by default. - std::unique_ptr exporter = JLogSpanExporterFactory::Create(DEFAULT_SPAN_LOG_FLAGS); + assertex(exportConfig); + + StringBuffer exportType; + exportConfig->getProp("@type", exportType); - //Administrators can choose to export trace data to a different backend by specifying the exporter type - if (traceConfig && traceConfig->hasProp("exporter")) + LOG(MCoperatorInfo, "Exporter type: %s", exportType.str()); + if (!exportType.isEmpty()) { - Owned exportConfig = traceConfig->getPropTree("exporter"); - if (exportConfig) + if (stricmp(exportType.str(), "OS")==0) //To stdout/err { - StringBuffer exportType; - exportConfig->getProp("@type", exportType); - LOG(MCoperatorInfo, "Exporter type: %s", exportType.str()); - - if (!exportType.isEmpty()) - { - if (stricmp(exportType.str(), "OS")==0) //To stdout/err - { - exporter = opentelemetry::exporter::trace::OStreamSpanExporterFactory::Create(); - LOG(MCoperatorInfo, "Tracing exporter set OS"); - } - else if (stricmp(exportType.str(), "OTLP")==0 || stricmp(exportType.str(), "OTLP-HTTP")==0) - { - opentelemetry::exporter::otlp::OtlpHttpExporterOptions trace_opts; - const char * endPoint = exportConfig->queryProp("@endpoint"); - if (endPoint) - trace_opts.url = endPoint; + LOG(MCoperatorInfo, "Tracing exporter set OS"); + return opentelemetry::exporter::trace::OStreamSpanExporterFactory::Create(); + } + else if (stricmp(exportType.str(), "OTLP")==0 || stricmp(exportType.str(), "OTLP-HTTP")==0) + { + opentelemetry::exporter::otlp::OtlpHttpExporterOptions trace_opts; + const char * endPoint = exportConfig->queryProp("@endpoint"); + if (endPoint) + trace_opts.url = endPoint; - if (exportConfig->hasProp("@timeOutSecs")) //not sure exactly what this value actually affects - trace_opts.timeout = std::chrono::seconds(exportConfig->getPropInt("@timeOutSecs")); + if (exportConfig->hasProp("@timeOutSecs")) //not sure exactly what this value actually affects + trace_opts.timeout = std::chrono::seconds(exportConfig->getPropInt("@timeOutSecs")); - // Whether to print the status of the exporter in the console - trace_opts.console_debug = exportConfig->getPropBool("@consoleDebug", false); + // Whether to print the status of the exporter in the console + trace_opts.console_debug = exportConfig->getPropBool("@consoleDebug", false); - exporter = opentelemetry::exporter::otlp::OtlpHttpExporterFactory::Create(trace_opts); - LOG(MCoperatorInfo,"Tracing exporter set to OTLP/HTTP to: (%s)", trace_opts.url.c_str()); - } - else if (stricmp(exportType.str(), "OTLP-GRPC")==0) - { - namespace otlp = opentelemetry::exporter::otlp; + LOG(MCoperatorInfo,"Tracing exporter set to OTLP/HTTP to: (%s)", trace_opts.url.c_str()); + return opentelemetry::exporter::otlp::OtlpHttpExporterFactory::Create(trace_opts); + } + else if (stricmp(exportType.str(), "OTLP-GRPC")==0) + { + namespace otlp = opentelemetry::exporter::otlp; - otlp::OtlpGrpcExporterOptions opts; + otlp::OtlpGrpcExporterOptions opts; - const char * endPoint = exportConfig->queryProp("@endpoint"); - if (endPoint) - opts.endpoint = endPoint; + const char * endPoint = exportConfig->queryProp("@endpoint"); + if (endPoint) + opts.endpoint = endPoint; - opts.use_ssl_credentials = exportConfig->getPropBool("@useSslCredentials", false); + opts.use_ssl_credentials = exportConfig->getPropBool("@useSslCredentials", false); - if (opts.use_ssl_credentials) - { - StringBuffer sslCACertPath; - exportConfig->getProp("@sslCredentialsCACertPath", sslCACertPath); - opts.ssl_credentials_cacert_path = sslCACertPath.str(); - } + if (opts.use_ssl_credentials) + { + StringBuffer sslCACertPath; + exportConfig->getProp("@sslCredentialsCACertPath", sslCACertPath); + opts.ssl_credentials_cacert_path = sslCACertPath.str(); + } - if (exportConfig->hasProp("@timeOutSecs")) //grpc deadline timeout in seconds - opts.timeout = std::chrono::seconds(exportConfig->getPropInt("@timeOutSecs")); + if (exportConfig->hasProp("@timeOutSecs")) //grpc deadline timeout in seconds + opts.timeout = std::chrono::seconds(exportConfig->getPropInt("@timeOutSecs")); - exporter = otlp::OtlpGrpcExporterFactory::Create(opts); - LOG(MCoperatorInfo, "Tracing exporter set to OTLP/GRPC to: (%s)", opts.endpoint.c_str()); - } - else if (stricmp(exportType.str(), "JLOG")==0) - { - StringBuffer logFlagsStr; - SpanLogFlags logFlags = SpanLogFlags::LogNone; - - if (exportConfig->getPropBool("@logSpanDetails", false)) - { - logFlags |= SpanLogFlags::LogSpanDetails; - logFlagsStr.append(" LogDetails "); - } - if (exportConfig->getPropBool("@logParentInfo", false)) - { - logFlags |= SpanLogFlags::LogParentInfo; - logFlagsStr.append(" LogParentInfo "); - } - if (exportConfig->getPropBool("@logAttributes", false)) - { - logFlags |= SpanLogFlags::LogAttributes; - logFlagsStr.append(" LogAttributes "); - } - if (exportConfig->getPropBool("@logEvents", false)) - { - logFlags |= SpanLogFlags::LogEvents; - logFlagsStr.append(" LogEvents "); - } - if (exportConfig->getPropBool("@logLinks", false)) - { - logFlags |= SpanLogFlags::LogLinks; - logFlagsStr.append(" LogLinks "); - } - if (exportConfig->getPropBool("@logResources", false)) - { - logFlags |= SpanLogFlags::LogResources; - logFlagsStr.append(" LogLinks "); - } + LOG(MCoperatorInfo, "Tracing exporter set to OTLP/GRPC to: (%s)", opts.endpoint.c_str()); + return otlp::OtlpGrpcExporterFactory::Create(opts); + } + else if (stricmp(exportType.str(), "JLOG")==0) + { + StringBuffer logFlagsStr; + SpanLogFlags logFlags = SpanLogFlags::LogNone; - //if no log feature flags provided, use default - if (logFlags == SpanLogFlags::LogNone) - logFlags = DEFAULT_SPAN_LOG_FLAGS; + if (exportConfig->getPropBool("@logSpanDetails", false)) + { + logFlags |= SpanLogFlags::LogSpanDetails; + logFlagsStr.append(" LogDetails "); + } + if (exportConfig->getPropBool("@logParentInfo", false)) + { + logFlags |= SpanLogFlags::LogParentInfo; + logFlagsStr.append(" LogParentInfo "); + } + if (exportConfig->getPropBool("@logAttributes", false)) + { + logFlags |= SpanLogFlags::LogAttributes; + logFlagsStr.append(" LogAttributes "); + } + if (exportConfig->getPropBool("@logEvents", false)) + { + logFlags |= SpanLogFlags::LogEvents; + logFlagsStr.append(" LogEvents "); + } + if (exportConfig->getPropBool("@logLinks", false)) + { + logFlags |= SpanLogFlags::LogLinks; + logFlagsStr.append(" LogLinks "); + } + if (exportConfig->getPropBool("@logResources", false)) + { + logFlags |= SpanLogFlags::LogResources; + logFlagsStr.append(" LogLinks "); + } - exporter = JLogSpanExporterFactory::Create(logFlags); + //if no log feature flags provided, use default + if (logFlags == SpanLogFlags::LogNone) + logFlags = DEFAULT_SPAN_LOG_FLAGS; - LOG(MCoperatorInfo, "Tracing exporter set to JLog: logFlags( LogAttributes LogParentInfo %s)", logFlagsStr.str()); - } - else if (stricmp(exportType.str(), "Prometheus")==0) - LOG(MCoperatorInfo, "Tracing to Prometheus currently not supported"); - else if (stricmp(exportType.str(), "NONE")==0) - { - exporter = NoopSpanExporterFactory::Create(); - LOG(MCoperatorInfo, "Tracing exporter set to 'NONE', no trace exporting will be performed"); - } - else - LOG(MCoperatorInfo, "Tracing exporter type not supported: '%s', JLog trace exporting will be performed", exportType.str()); - } - else - LOG(MCoperatorInfo, "Tracing exporter type not specified"); + LOG(MCoperatorInfo, "Tracing exporter set to JLog: logFlags( LogAttributes LogParentInfo %s)", logFlagsStr.str()); + return JLogSpanExporterFactory::Create(logFlags); } + else + LOG(MCoperatorWarning, "Tracing exporter type not supported: '%s'", exportType.str()); } + else + LOG(MCoperatorWarning, "Tracing exporter type not specified"); + return nullptr; +} - //Administrator can choose to process spans in batches or one at a time - //Default: SimpleSpanProcesser sends spans one by one to an exporter. - std::unique_ptr processor = opentelemetry::sdk::trace::SimpleSpanProcessorFactory::Create(std::move(exporter)); - if (traceConfig && traceConfig->hasProp("processor/@type")) - { - StringBuffer processorType; - bool foundProcessorType = traceConfig->getProp("processor/@type", processorType); +std::unique_ptr CTraceManager::createProcessor(const IPropertyTree * exportConfig) +{ + auto exporter = createExporter(exportConfig); + if (!exporter) + return nullptr; + + if (exportConfig->getPropBool("batch/@enabled", false)) + { + //Groups several spans together, before sending them to an exporter. + //MORE: These options should be configurable from batch/@option + opentelemetry::v1::sdk::trace::BatchSpanProcessorOptions options; //size_t max_queue_size = 2048; + //The time interval between two consecutive exports + //std::chrono::milliseconds(5000); + //The maximum batch size of every export. It must be smaller or + //equal to max_queue_size. + //size_t max_export_batch_size = 512 + return opentelemetry::sdk::trace::BatchSpanProcessorFactory::Create(std::move(exporter), options); + } - if (foundProcessorType && strcmp("batch", processorType.str())==0) - { - //Groups several spans together, before sending them to an exporter. - //These options should be configurable - opentelemetry::v1::sdk::trace::BatchSpanProcessorOptions options; //size_t max_queue_size = 2048; - //The time interval between two consecutive exports - //std::chrono::milliseconds(5000); - //The maximum batch size of every export. It must be smaller or - //equal to max_queue_size. - //size_t max_export_batch_size = 512 - processor = opentelemetry::sdk::trace::BatchSpanProcessorFactory::Create(std::move(exporter), options); - LOG(MCoperatorInfo, "OpenTel tracing using batch Span Processor"); - } - else if (foundProcessorType && strcmp("simple", processorType.str())==0) - { - LOG(MCoperatorInfo, "OpenTel tracing using batch simple Processor"); - } - else + return opentelemetry::sdk::trace::SimpleSpanProcessorFactory::Create(std::move(exporter)); +} + +void CTraceManager::initTracerProviderAndGlobalInternals(const IPropertyTree * traceConfig) +{ + std::vector> processors; + + //By default trace spans to the logs in debug builds - so that developers get used to seeing them. + //Default off for release builds to avoid flooding the logs, and because they are likely to use OTLP + bool enableDefaultLogExporter = isDebugBuild(); + if (traceConfig) + { + //Administrators can choose to export trace data to a different backend by specifying the exporter type + Owned iter = traceConfig->getElements("exporters"); + ForEach(*iter) { - LOG(MCoperatorInfo, "OpenTel tracing detected invalid processor type: '%s'", processorType.str()); + IPropertyTree & curExporter = iter->query(); + std::unique_ptr processor = createProcessor(&curExporter); + if (processor) + processors.push_back(std::move(processor)); } + + enableDefaultLogExporter = traceConfig->getPropBool("enableDefaultLogExporter", enableDefaultLogExporter); } - std::vector> processors; - processors.push_back(std::move(processor)); + if (enableDefaultLogExporter) + { + //Simple option to create logging to the log file - primarily to aid developers. + std::unique_ptr exporter = JLogSpanExporterFactory::Create(DEFAULT_SPAN_LOG_FLAGS); + processors.push_back(opentelemetry::sdk::trace::SimpleSpanProcessorFactory::Create(std::move(exporter))); + } // Default is an always-on sampler. std::shared_ptr context = @@ -1239,13 +1243,13 @@ Expected Configuration format: disabled: true #optional - disable OTel tracing alwaysCreateGlobalIds : false #optional - should global ids always be created? alwaysCreateTraceIds #optional - should trace ids always be created? - exporter: #optional - Controls how trace data is exported/reported - type: OTLP #OS|OTLP|Prometheus|JLOG (default: JLOG) + exporters: #optional - Controls how trace data is exported/reported + - type: OTLP #OS|OTLP|Prometheus|JLOG endpoint: "localhost:4317" #exporter specific key/value pairs useSslCredentials: true sslCredentialsCACcert: "ssl-certificate" - processor: #optional - Controls span processing style - type: batch #simple|batch (default: simple) + batch: #optional - Controls span processing style + enabled #is batched processing enabled? */ void CTraceManager::initTracer(const IPropertyTree * traceConfig) { diff --git a/system/jlib/jutil.hpp b/system/jlib/jutil.hpp index a6fa94f2a46..c0fffad2c63 100644 --- a/system/jlib/jutil.hpp +++ b/system/jlib/jutil.hpp @@ -315,6 +315,13 @@ inline constexpr bool isContainerized() { return true; } inline constexpr bool isContainerized() { return false; } #endif +//Same for isDebugBuild() rather than requiring #ifdef _DEBUG +#ifdef _DEBUG +inline constexpr bool isDebugBuild() { return true; } +#else +inline constexpr bool isDebugBuild() { return false; } +#endif + #ifndef arraysize #define arraysize(T) (sizeof(T)/sizeof(*T)) #endif diff --git a/testing/helm/tests/tracing.yaml b/testing/helm/tests/tracing.yaml new file mode 100644 index 00000000000..801c420aee9 --- /dev/null +++ b/testing/helm/tests/tracing.yaml @@ -0,0 +1,14 @@ +global: + tracing: + enableDefaultLogExporter: true + exporters: + - type: OTLP-HTTP + consoleDebug: true + batch: + enabled: true + - type: JLOG + logSpanDetails: true + logParentInfo: true + logAttributes: true + logEvents: true + logLinks: true diff --git a/testing/regress/ecl/common/SoapTextTest.ecl b/testing/regress/ecl/common/SoapTextTest.ecl index afb91f99709..26b1d0fbbf9 100644 --- a/testing/regress/ecl/common/SoapTextTest.ecl +++ b/testing/regress/ecl/common/SoapTextTest.ecl @@ -86,10 +86,10 @@ EXPORT SoapTextTest := MODULE EXPORT doMain(string serviceUrl, string searchWords, unsigned documentLimit) := FUNCTION - soapcallDocumentCount(string searchWord) := SOAPCALL(serviceUrl, 'soaptest_getdocumentcount', countServiceRequest, transform(countServiceRequest, SELF.search := searchWord), countServiceResponse).cnt; + soapcallDocumentCount(string searchWord) := SOAPCALL(serviceUrl, 'soaptest_getdocumentcount', countServiceRequest, transform(countServiceRequest, SELF.search := searchWord), countServiceResponse, PERSIST).cnt; callDocumentCount(string search) := IF((serviceUrl != ''), soapcallDocumentCount(search), doDocumentCount(search)); - soapcallSearchWords(DATASET(wordRec) searchWords) := SOAPCALL(serviceUrl, 'soaptest_getsearchwords', { DATASET(wordRec) search := searchWords }, DATASET(joinServiceResponseRecord)); + soapcallSearchWords(DATASET(wordRec) searchWords) := SOAPCALL(serviceUrl, 'soaptest_getsearchwords', { DATASET(wordRec) search := searchWords }, DATASET(joinServiceResponseRecord), PERSIST); callSearchWords(DATASET(wordRec) searchWords) := IF((serviceUrl != ''), soapcallSearchWords(searchWords), doSearchWords(searchWords)); splitWords := Std.Str.SplitWords(searchWords, ',', false); diff --git a/thorlcr/activities/join/thjoin.cpp b/thorlcr/activities/join/thjoin.cpp index 28eba299992..0f46d169adc 100644 --- a/thorlcr/activities/join/thjoin.cpp +++ b/thorlcr/activities/join/thjoin.cpp @@ -344,7 +344,7 @@ class JoinActivityMaster : public CMasterActivity CMasterActivity::fireException(e); lastMsgTime = msTick(); } - else if (msTick() > lastMsgTime + MSGTIME) + else if (msTick() - lastMsgTime > MSGTIME) { Owned e = MakeActivityWarning(this, -1, "SELFJOIN: Warning %d preliminary matches, join will take some time", count); CMasterActivity::fireException(e); diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index a67532ef418..7d571da3d8f 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -661,7 +661,7 @@ void CMasterActivity::updateFileReadCostStats() { // Legacy file: calculate readCost using prev disk reads and new disk reads stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0); - legacyReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads)); + legacyReadCost = calcFileAccessCost(clusterName, 0, prevDiskReads); } stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads); if(useJhtreeCacheStats) @@ -669,10 +669,10 @@ void CMasterActivity::updateFileReadCostStats() stat_type numActualReads = stats.getStatisticSum(StNumNodeDiskFetches) + stats.getStatisticSum(StNumLeafDiskFetches) + stats.getStatisticSum(StNumBlobDiskFetches); - curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, numActualReads)); + curReadCost = calcFileAccessCost(clusterName, 0, numActualReads); } else - curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads)); + curReadCost = calcFileAccessCost(clusterName, 0, curDiskReads); file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost); file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads); return curReadCost; @@ -728,7 +728,7 @@ void CMasterActivity::updateFileWriteCostStats(IFileDescriptor & fileDesc, IProp assertex(fileDesc.numClusters()>=1); StringBuffer clusterName; fileDesc.getClusterGroupName(0, clusterName);// Note: calculating for 1st cluster. (Future: calc for >1 clusters) - cost_type writeCost = money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, 0)); + cost_type writeCost = calcFileAccessCost(clusterName, numDiskWrites, 0); props.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost); diskAccessCost = writeCost; }