From d167c568eb264293beb01779e15cae36451c4040 Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 28 Jun 2023 10:39:59 +0100 Subject: [PATCH 1/2] HPCC-29831 Allow localAgent (and other) setting to be set in workunit/regression suite These options are only applied when Roxie us run in workunit mode in containerized systems. Signed-off-by: Richard Chapman --- common/workunit/workunit.cpp | 36 ++++++++++++++++++++------- roxie/ccd/ccd.hpp | 1 + roxie/ccd/ccdmain.cpp | 39 ++++++++++++++++++++++++++---- roxie/ccd/ccdserver.cpp | 5 ++-- testing/regress/ecl/remote.ecl | 6 +++++ testing/regress/ecl/stresstext.ecl | 7 ++++++ 6 files changed, 77 insertions(+), 17 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index b64b563b8f8..4645d1f3c9e 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -8500,7 +8500,9 @@ void CLocalWorkUnit::copyWorkUnit(IConstWorkUnit *cached, bool copyStats, bool a bool CLocalWorkUnit::hasDebugValue(const char *propname) const { StringBuffer lower; - lower.append(propname).toLowerCase(); + lower.append(propname); + if (!strchr(lower, ':')) + lower.toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); return p->hasProp(prop.append(lower)); @@ -8509,7 +8511,9 @@ bool CLocalWorkUnit::hasDebugValue(const char *propname) const IStringVal& CLocalWorkUnit::getDebugValue(const char *propname, IStringVal &str) const { StringBuffer lower; - lower.append(propname).toLowerCase(); + lower.append(propname); + if (!strchr(lower, ':')) + lower.toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); str.set(p->queryProp(prop.append(lower).str())); @@ -8528,7 +8532,9 @@ IStringIterator& CLocalWorkUnit::getDebugValues(const char *prop) const if (prop) { StringBuffer lower; - lower.append(prop).toLowerCase(); + lower.append(prop); + if (!strchr(lower, ':')) + lower.toLowerCase(); path.append(lower); } else @@ -8539,7 +8545,9 @@ IStringIterator& CLocalWorkUnit::getDebugValues(const char *prop) const int CLocalWorkUnit::getDebugValueInt(const char *propname, int defVal) const { StringBuffer lower; - lower.append(propname).toLowerCase(); + lower.append(propname); + if (!strchr(lower, ':')) + lower.toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); @@ -8549,7 +8557,9 @@ int CLocalWorkUnit::getDebugValueInt(const char *propname, int defVal) const __int64 CLocalWorkUnit::getDebugValueInt64(const char *propname, __int64 defVal) const { StringBuffer lower; - lower.append(propname).toLowerCase(); + lower.append(propname); + if (!strchr(lower, ':')) + lower.toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); @@ -8559,7 +8569,9 @@ __int64 CLocalWorkUnit::getDebugValueInt64(const char *propname, __int64 defVal) double CLocalWorkUnit::getDebugValueReal(const char *propname, double defVal) const { StringBuffer lower; - lower.append(propname).toLowerCase(); + lower.append(propname); + if (!strchr(lower, ':')) + lower.toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); @@ -8569,7 +8581,9 @@ double CLocalWorkUnit::getDebugValueReal(const char *propname, double defVal) co bool CLocalWorkUnit::getDebugValueBool(const char * propname, bool defVal) const { StringBuffer lower; - lower.append(propname).toLowerCase(); + lower.append(propname); + if (!strchr(lower, ':')) + lower.toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); @@ -8642,7 +8656,9 @@ void CLocalWorkUnit::addProcess(const char *type, const char *instance, unsigned void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool overwrite) { StringBuffer lower; - lower.append(propname).toLowerCase(); + lower.append(propname); + if (!strchr(lower, ':')) + lower.toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); @@ -8657,7 +8673,9 @@ void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool void CLocalWorkUnit::setDebugValueInt(const char *propname, int value, bool overwrite) { StringBuffer lower; - lower.append(propname).toLowerCase(); + lower.append(propname); + if (!strchr(lower, ':')) + lower.toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index a81d5bf9ec4..f70b654b18a 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -402,6 +402,7 @@ extern bool defaultExecuteDependenciesSequentially; extern bool defaultStartInputsSequentially; extern bool oneShotRoxie; extern bool traceStrands; +extern unsigned minPayloadSize; extern int backgroundCopyClass; extern int backgroundCopyPrio; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 9c3563d1b4b..5448ac16787 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -102,6 +102,7 @@ bool mergeAgentStatistics = true; PTreeReaderOptions defaultXmlReadFlags = ptr_ignoreWhiteSpace; bool runOnce = false; bool oneShotRoxie = false; +unsigned minPayloadSize = 800; unsigned udpMulticastBufferSize = 262142; #if !defined(_CONTAINERIZED) && !defined(SUBCHANNELS_IN_HEADER) @@ -122,7 +123,7 @@ bool lockSuperFiles; bool useRemoteResources; bool checkFileDate; bool lazyOpen; -bool localAgent; +bool localAgent = false; bool encryptInTransit; bool useAeron; bool ignoreOrphans; @@ -691,13 +692,43 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) useOldTopology = checkFileExists(topologyFile.str()); topology = loadConfiguration(useOldTopology ? nullptr : defaultYaml, argv, "roxie", "ROXIE", topologyFile, nullptr, "@netAddress"); saveTopology(); + + // Any settings we read from topology that must NOT be overridden in workunit debug fields should be read at this point, before the following section + getAllowedPipePrograms(allowedPipePrograms, true); + + // Allow workunit debug fields to override most roxie configuration values, for testing/debug purposes. + + topology->getProp("@daliServers", fileNameServiceDali); + const char *wuid = topology->queryProp("@workunit"); + if (wuid) + { + Owned daliHelper; + Owned wu; + daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT)); + wu.setown(daliHelper->attachWorkunit(wuid)); + Owned debugValues = &wu->getDebugValues(); + ForEach (*debugValues) + { + StringBuffer debugStr; + SCMStringBuffer valueStr; + StringBufferAdaptor aDebugStr(debugStr); + debugValues->str(aDebugStr); + if (startsWith(debugStr, "roxie:")) + { + wu->getDebugValue(debugStr.str(), valueStr); + debugStr.replaceString("roxie:", "@"); + topology->setProp(debugStr.str(), valueStr.str()); + } + } + } + if (topology->getPropBool("expert/@profileStartup", false)) { double interval = topology->getPropReal("expert/@profileStartupInterval", 0.2); startupTracer.setInterval(interval); startupTracer.start(); } - localAgent = topology->getPropBool("@localAgent", topology->getPropBool("@localSlave", false)); // legacy name + localAgent = topology->getPropBool("@localAgent", topology->getPropBool("@localSlave", localAgent)); // legacy name encryptInTransit = topology->getPropBool("@encryptInTransit", false) && !localAgent; if (encryptInTransit) initSecretUdpKey(); @@ -770,7 +801,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) installDefaultFileHooks(topology); Owned standAloneDll; - const char *wuid = topology->queryProp("@workunit"); if (wuid) setDefaultJobId(wuid); if (topology->hasProp("@loadWorkunit")) @@ -934,6 +964,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) } } + minPayloadSize = topology->getPropInt("@minPayloadSize", minPayloadSize); acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests); headRegionSize = topology->getPropInt("@headRegionSize", 0); ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); @@ -1182,8 +1213,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) maxFilesOpen[true] = topology->getPropInt("@maxRemoteFilesOpen", 1000); dafilesrvLookupTimeout = topology->getPropInt("@dafilesrvLookupTimeout", 10000); setRemoteFileTimeouts(dafilesrvLookupTimeout, 0); - topology->getProp("@daliServers", fileNameServiceDali); - getAllowedPipePrograms(allowedPipePrograms, true); trapTooManyActiveQueries = topology->getPropBool("@trapTooManyActiveQueries", true); maxEmptyLoopIterations = topology->getPropInt("@maxEmptyLoopIterations", 1000); maxGraphLoopIterations = topology->getPropInt("@maxGraphLoopIterations", 1000); diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index b960a9296fa..c1d010d90a4 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -77,7 +77,6 @@ #include "rtldynfield.hpp" #define MAX_HTTP_HEADERSIZE 8000 -#define MIN_PAYLOAD_SIZE 800 #ifdef _WIN32 #pragma warning(disable : 4355) @@ -4148,8 +4147,8 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie void init(unsigned minSize) { assertex(!buffer.length()); - if (minSize < MIN_PAYLOAD_SIZE) - minSize = MIN_PAYLOAD_SIZE; + if (minSize < minPayloadSize) + minSize = minPayloadSize; unsigned headerSize = sizeof(RoxiePacketHeader)+owner.headerLength(); unsigned bufferSize = headerSize+minSize; if (bufferSize < mtu_size) diff --git a/testing/regress/ecl/remote.ecl b/testing/regress/ecl/remote.ecl index d30a892216d..e3f4b5abb28 100644 --- a/testing/regress/ecl/remote.ecl +++ b/testing/regress/ecl/remote.ecl @@ -17,12 +17,18 @@ //nohthor //nothor +//version localAgent=true +//version localAgent=false + IMPORT Std.system.thorlib as thorlib; import ^ as root; multiPart := #IFDEFINED(root.multiPart, true); useLocal := #IFDEFINED(root.useLocal, false); useTranslation := #IFDEFINED(root.useTranslation, false); +useLocalAgent := #IFDEFINED(root.localAgent, false); + +#option('roxie:localAgent', useLocalAgent); import $.setup; Files := setup.Files(multiPart, useLocal, useTranslation); diff --git a/testing/regress/ecl/stresstext.ecl b/testing/regress/ecl/stresstext.ecl index 6f7e5013192..3d0a896e589 100644 --- a/testing/regress/ecl/stresstext.ecl +++ b/testing/regress/ecl/stresstext.ecl @@ -23,6 +23,13 @@ //version multiPart=false,variant='inplace_lzw' //version multiPart=false,variant='inplace_lz4hc' +// The settings below may be useful when trying to analyse Roxie keyed join behaviour, as they will +// eliminate some wait time for an agent queue to become available + +//#option('roxie:minPayloadSize', 10000) +//#option('roxie:agentThreads', 400) +//#option('roxie:prestartAgentThreads', true) + import ^ as root; multiPart := #IFDEFINED(root.multiPart, true); variant := #IFDEFINED(root.variant, ''); From da684f027e854857efe2940f6e172d083f5f3181 Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 2 Aug 2023 16:22:01 +0100 Subject: [PATCH 2/2] HPCC-29831 Allow localAgent (and other) setting to be set in workunit/regression suite Fix some issues with setting topology when localAgent is false Signed-off-by: Richard Chapman --- roxie/ccd/ccdmain.cpp | 83 ++++++++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 41 deletions(-) diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 5448ac16787..c028d6dfada 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -465,7 +465,7 @@ void readStaticTopology() std::vector allRoles; IpAddressArray nodeTable; unsigned numNodes = topology->getCount("./RoxieServerProcess"); - if (!numNodes && localAgent) + if (!numNodes && oneShotRoxie) { if (topology->getPropBool("expert/@addDummyNode", false)) { @@ -480,7 +480,7 @@ void readStaticTopology() topology->setPropInt("@channelsPerNode", 2); topology->setProp("@agentConfig", "cyclic"); } - else if (localAgent) + else if (oneShotRoxie) { topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", "."); numNodes = 1; @@ -732,45 +732,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) encryptInTransit = topology->getPropBool("@encryptInTransit", false) && !localAgent; if (encryptInTransit) initSecretUdpKey(); - numChannels = topology->getPropInt("@numChannels", 0); -#ifdef _CONTAINERIZED - if (!numChannels) - throw makeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not set"); -#endif - const char *channels = topology->queryProp("@channels"); - if (channels) - { - StringArray channelSpecs; - channelSpecs.appendList(channels, ",", true); - ForEachItemIn(idx, channelSpecs) - { - char *tail = nullptr; - unsigned channel = strtoul(channelSpecs.item(idx), &tail, 10); - unsigned repl = 0; - if (*tail==':') - { - tail++; - repl = atoi(tail); - } - else if (*tail) - throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s", channels); - agentChannels.push_back(std::pair(channel, repl)); - } -#ifdef _CONTAINERIZED - if (agentChannels.size() != 1) - throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s - single channel expected", channels); - myChannel = agentChannels[0].first; - if (myChannel > numChannels) - throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s - value out of range", channels); -#endif - } -#ifdef _CONTAINERIZED - else if (localAgent) - { - for (unsigned channel = 1; channel <= numChannels; channel++) - agentChannels.push_back(std::pair(channel, 0)); - } -#endif const char *topos = topology->queryProp("@topologyServers"); StringArray topoValues; if (topos) @@ -844,6 +805,46 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) runOnce = true; } + numChannels = topology->getPropInt("@numChannels", 0); +#ifdef _CONTAINERIZED + if (!numChannels) + throw makeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not set"); +#endif + const char *channels = topology->queryProp("@channels"); + if (channels) + { + StringArray channelSpecs; + channelSpecs.appendList(channels, ",", true); + ForEachItemIn(idx, channelSpecs) + { + char *tail = nullptr; + unsigned channel = strtoul(channelSpecs.item(idx), &tail, 10); + unsigned repl = 0; + if (*tail==':') + { + tail++; + repl = atoi(tail); + } + else if (*tail) + throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s", channels); + agentChannels.push_back(std::pair(channel, repl)); + } +#ifdef _CONTAINERIZED + if (agentChannels.size() != 1) + throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s - single channel expected", channels); + myChannel = agentChannels[0].first; + if (myChannel > numChannels) + throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s - value out of range", channels); +#endif + } +#ifdef _CONTAINERIZED + else if (oneShotRoxie) + { + for (unsigned channel = 1; channel <= numChannels; channel++) + agentChannels.push_back(std::pair(channel, 0)); + } +#endif + if (!topology->hasProp("@resolveLocally")) topology->setPropBool("@resolveLocally", !topology->hasProp("@daliServers"));