Skip to content

Commit

Permalink
Merge pull request #17513 from richardkchapman/localagent-wu
Browse files Browse the repository at this point in the history
HPCC-29831 Allow localAgent setting to be set in workunit/regression suite

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Aug 9, 2023
2 parents c9d6d57 + da684f0 commit 1e7caef
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 58 deletions.
36 changes: 27 additions & 9 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8500,7 +8500,9 @@ void CLocalWorkUnit::copyWorkUnit(IConstWorkUnit *cached, bool copyStats, bool a
bool CLocalWorkUnit::hasDebugValue(const char *propname) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
return p->hasProp(prop.append(lower));
Expand All @@ -8509,7 +8511,9 @@ bool CLocalWorkUnit::hasDebugValue(const char *propname) const
IStringVal& CLocalWorkUnit::getDebugValue(const char *propname, IStringVal &str) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
str.set(p->queryProp(prop.append(lower).str()));
Expand All @@ -8528,7 +8532,9 @@ IStringIterator& CLocalWorkUnit::getDebugValues(const char *prop) const
if (prop)
{
StringBuffer lower;
lower.append(prop).toLowerCase();
lower.append(prop);
if (!strchr(lower, ':'))
lower.toLowerCase();
path.append(lower);
}
else
Expand All @@ -8539,7 +8545,9 @@ IStringIterator& CLocalWorkUnit::getDebugValues(const char *prop) const
int CLocalWorkUnit::getDebugValueInt(const char *propname, int defVal) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand All @@ -8549,7 +8557,9 @@ int CLocalWorkUnit::getDebugValueInt(const char *propname, int defVal) const
__int64 CLocalWorkUnit::getDebugValueInt64(const char *propname, __int64 defVal) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand All @@ -8559,7 +8569,9 @@ __int64 CLocalWorkUnit::getDebugValueInt64(const char *propname, __int64 defVal)
double CLocalWorkUnit::getDebugValueReal(const char *propname, double defVal) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand All @@ -8569,7 +8581,9 @@ double CLocalWorkUnit::getDebugValueReal(const char *propname, double defVal) co
bool CLocalWorkUnit::getDebugValueBool(const char * propname, bool defVal) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand Down Expand Up @@ -8642,7 +8656,9 @@ void CLocalWorkUnit::addProcess(const char *type, const char *instance, unsigned
void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool overwrite)
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand All @@ -8657,7 +8673,9 @@ void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool
void CLocalWorkUnit::setDebugValueInt(const char *propname, int value, bool overwrite)
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ extern bool defaultExecuteDependenciesSequentially;
extern bool defaultStartInputsSequentially;
extern bool oneShotRoxie;
extern bool traceStrands;
extern unsigned minPayloadSize;

extern int backgroundCopyClass;
extern int backgroundCopyPrio;
Expand Down
122 changes: 76 additions & 46 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ bool mergeAgentStatistics = true;
PTreeReaderOptions defaultXmlReadFlags = ptr_ignoreWhiteSpace;
bool runOnce = false;
bool oneShotRoxie = false;
unsigned minPayloadSize = 800;

unsigned udpMulticastBufferSize = 262142;
#if !defined(_CONTAINERIZED) && !defined(SUBCHANNELS_IN_HEADER)
Expand All @@ -122,7 +123,7 @@ bool lockSuperFiles;
bool useRemoteResources;
bool checkFileDate;
bool lazyOpen;
bool localAgent;
bool localAgent = false;
bool encryptInTransit;
bool useAeron;
bool ignoreOrphans;
Expand Down Expand Up @@ -464,7 +465,7 @@ void readStaticTopology()
std::vector<RoxieEndpointInfo> allRoles;
IpAddressArray nodeTable;
unsigned numNodes = topology->getCount("./RoxieServerProcess");
if (!numNodes && localAgent)
if (!numNodes && oneShotRoxie)
{
if (topology->getPropBool("expert/@addDummyNode", false))
{
Expand All @@ -479,7 +480,7 @@ void readStaticTopology()
topology->setPropInt("@channelsPerNode", 2);
topology->setProp("@agentConfig", "cyclic");
}
else if (localAgent)
else if (oneShotRoxie)
{
topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", ".");
numNodes = 1;
Expand Down Expand Up @@ -691,55 +692,46 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
useOldTopology = checkFileExists(topologyFile.str());
topology = loadConfiguration(useOldTopology ? nullptr : defaultYaml, argv, "roxie", "ROXIE", topologyFile, nullptr, "@netAddress");
saveTopology();

// Any settings we read from topology that must NOT be overridden in workunit debug fields should be read at this point, before the following section
getAllowedPipePrograms(allowedPipePrograms, true);

// Allow workunit debug fields to override most roxie configuration values, for testing/debug purposes.

topology->getProp("@daliServers", fileNameServiceDali);
const char *wuid = topology->queryProp("@workunit");
if (wuid)
{
Owned<IRoxieDaliHelper> daliHelper;
Owned<IConstWorkUnit> wu;
daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
wu.setown(daliHelper->attachWorkunit(wuid));
Owned<IStringIterator> debugValues = &wu->getDebugValues();
ForEach (*debugValues)
{
StringBuffer debugStr;
SCMStringBuffer valueStr;
StringBufferAdaptor aDebugStr(debugStr);
debugValues->str(aDebugStr);
if (startsWith(debugStr, "roxie:"))
{
wu->getDebugValue(debugStr.str(), valueStr);
debugStr.replaceString("roxie:", "@");
topology->setProp(debugStr.str(), valueStr.str());
}
}
}

if (topology->getPropBool("expert/@profileStartup", false))
{
double interval = topology->getPropReal("expert/@profileStartupInterval", 0.2);
startupTracer.setInterval(interval);
startupTracer.start();
}
localAgent = topology->getPropBool("@localAgent", topology->getPropBool("@localSlave", false)); // legacy name
localAgent = topology->getPropBool("@localAgent", topology->getPropBool("@localSlave", localAgent)); // legacy name
encryptInTransit = topology->getPropBool("@encryptInTransit", false) && !localAgent;
if (encryptInTransit)
initSecretUdpKey();
numChannels = topology->getPropInt("@numChannels", 0);
#ifdef _CONTAINERIZED
if (!numChannels)
throw makeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not set");
#endif
const char *channels = topology->queryProp("@channels");
if (channels)
{
StringArray channelSpecs;
channelSpecs.appendList(channels, ",", true);
ForEachItemIn(idx, channelSpecs)
{
char *tail = nullptr;
unsigned channel = strtoul(channelSpecs.item(idx), &tail, 10);
unsigned repl = 0;
if (*tail==':')
{
tail++;
repl = atoi(tail);
}
else if (*tail)
throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s", channels);
agentChannels.push_back(std::pair<unsigned, unsigned>(channel, repl));
}
#ifdef _CONTAINERIZED
if (agentChannels.size() != 1)
throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s - single channel expected", channels);
myChannel = agentChannels[0].first;
if (myChannel > numChannels)
throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s - value out of range", channels);
#endif
}
#ifdef _CONTAINERIZED
else if (localAgent)
{
for (unsigned channel = 1; channel <= numChannels; channel++)
agentChannels.push_back(std::pair<unsigned, unsigned>(channel, 0));
}
#endif
const char *topos = topology->queryProp("@topologyServers");
StringArray topoValues;
if (topos)
Expand Down Expand Up @@ -770,7 +762,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
installDefaultFileHooks(topology);

Owned<const IQueryDll> standAloneDll;
const char *wuid = topology->queryProp("@workunit");
if (wuid)
setDefaultJobId(wuid);
if (topology->hasProp("@loadWorkunit"))
Expand Down Expand Up @@ -814,6 +805,46 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
runOnce = true;
}

numChannels = topology->getPropInt("@numChannels", 0);
#ifdef _CONTAINERIZED
if (!numChannels)
throw makeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not set");
#endif
const char *channels = topology->queryProp("@channels");
if (channels)
{
StringArray channelSpecs;
channelSpecs.appendList(channels, ",", true);
ForEachItemIn(idx, channelSpecs)
{
char *tail = nullptr;
unsigned channel = strtoul(channelSpecs.item(idx), &tail, 10);
unsigned repl = 0;
if (*tail==':')
{
tail++;
repl = atoi(tail);
}
else if (*tail)
throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s", channels);
agentChannels.push_back(std::pair<unsigned, unsigned>(channel, repl));
}
#ifdef _CONTAINERIZED
if (agentChannels.size() != 1)
throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s - single channel expected", channels);
myChannel = agentChannels[0].first;
if (myChannel > numChannels)
throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s - value out of range", channels);
#endif
}
#ifdef _CONTAINERIZED
else if (oneShotRoxie)
{
for (unsigned channel = 1; channel <= numChannels; channel++)
agentChannels.push_back(std::pair<unsigned, unsigned>(channel, 0));
}
#endif

if (!topology->hasProp("@resolveLocally"))
topology->setPropBool("@resolveLocally", !topology->hasProp("@daliServers"));

Expand Down Expand Up @@ -934,6 +965,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
}
}

minPayloadSize = topology->getPropInt("@minPayloadSize", minPayloadSize);
acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests);
headRegionSize = topology->getPropInt("@headRegionSize", 0);
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
Expand Down Expand Up @@ -1182,8 +1214,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
maxFilesOpen[true] = topology->getPropInt("@maxRemoteFilesOpen", 1000);
dafilesrvLookupTimeout = topology->getPropInt("@dafilesrvLookupTimeout", 10000);
setRemoteFileTimeouts(dafilesrvLookupTimeout, 0);
topology->getProp("@daliServers", fileNameServiceDali);
getAllowedPipePrograms(allowedPipePrograms, true);
trapTooManyActiveQueries = topology->getPropBool("@trapTooManyActiveQueries", true);
maxEmptyLoopIterations = topology->getPropInt("@maxEmptyLoopIterations", 1000);
maxGraphLoopIterations = topology->getPropInt("@maxGraphLoopIterations", 1000);
Expand Down
5 changes: 2 additions & 3 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
#include "rtldynfield.hpp"

#define MAX_HTTP_HEADERSIZE 8000
#define MIN_PAYLOAD_SIZE 800

#ifdef _WIN32
#pragma warning(disable : 4355)
Expand Down Expand Up @@ -4152,8 +4151,8 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
void init(unsigned minSize)
{
assertex(!buffer.length());
if (minSize < MIN_PAYLOAD_SIZE)
minSize = MIN_PAYLOAD_SIZE;
if (minSize < minPayloadSize)
minSize = minPayloadSize;
unsigned headerSize = sizeof(RoxiePacketHeader)+owner.headerLength();
unsigned bufferSize = headerSize+minSize;
if (bufferSize < mtu_size)
Expand Down
6 changes: 6 additions & 0 deletions testing/regress/ecl/remote.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@

//nohthor
//nothor
//version localAgent=true
//version localAgent=false


IMPORT Std.system.thorlib as thorlib;
import ^ as root;
multiPart := #IFDEFINED(root.multiPart, true);
useLocal := #IFDEFINED(root.useLocal, false);
useTranslation := #IFDEFINED(root.useTranslation, false);
useLocalAgent := #IFDEFINED(root.localAgent, false);

#option('roxie:localAgent', useLocalAgent);

import $.setup;
Files := setup.Files(multiPart, useLocal, useTranslation);
Expand Down
7 changes: 7 additions & 0 deletions testing/regress/ecl/stresstext.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
//version multiPart=false,variant='inplace_lzw'
//version multiPart=false,variant='inplace_lz4hc'

// The settings below may be useful when trying to analyse Roxie keyed join behaviour, as they will
// eliminate some wait time for an agent queue to become available

//#option('roxie:minPayloadSize', 10000)
//#option('roxie:agentThreads', 400)
//#option('roxie:prestartAgentThreads', true)

import ^ as root;
multiPart := #IFDEFINED(root.multiPart, true);
variant := #IFDEFINED(root.variant, '');
Expand Down

0 comments on commit 1e7caef

Please sign in to comment.