Skip to content

Commit

Permalink
Merge pull request #17663 from richardkchapman/localagent-optimizations
Browse files Browse the repository at this point in the history
HPCC-30057 Use DataBuffers to store returned rows in localAgent mode

Reviewed-by: Mark Kelly [email protected]
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Sep 8, 2023
2 parents 957d9a6 + 4bf9336 commit bcb0131
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 17 deletions.
14 changes: 14 additions & 0 deletions devdoc/roxie.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,17 @@ Should the scope of the blacklist be different? Possible scopes are:

Options 2 and 4 above would allow all aspects of the blacklisting behaviour to be specified by options on the SOAPCALL. We could control whether or not the
blacklister is to be used at all via a SOAPCALL option with any of the above...

Some notes on LocalAgent mode
=============================

In localAgent mode, the global queueManager object (normally a RoxieUdpSocketQueueManager) is replaced by a RoxieLocalQueueManager. Outbound packets are added directly to target queue, inbound are packed into DataBuffers.

There is also "local optimizations" mode where any index operation reading a one-part file (does the same apply to one-part disk files?) just reads it directly on the server (regardless of localAgent setting). Typically still injected into receiver code though as otherwise handling exception cases, limits etc would all be duplicated/messy. Rows created in localOptimization mode are created directly in the caller's row manager, and are injected in serialized format.

Why are inbound not created directly in the desired destination's allocator and then marked as serialized? Some lifespan issues... are they insurmountable?
We do pack into dataBuffers rather than MemoryBuffers, which avoids a need to copy the data before the receiver can use it. Large rows get split and will require copying again, but we could set dataBufferSize to be bigger in localAgent mode to mitigate this somewhat.

What is the lifespan issue? In-flight queries may be abandoned when a server-side query fails, times out, or no longer needs the data. Using DataBuffer does not have this issue as they are attached to the query's memory manager/allocation once read. Or we could bypass the agent queue altogether, but rather more refactoring needed for that (might almost be easier to extent the "local optimization" mode to use multiple threads at that point)

abortPending, replyPending, and abortPendingData methods are unimplemented, which may lead to some inefficiencies?
5 changes: 5 additions & 0 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,11 @@
"default": 0,
"description": "Specify an IONICE value for the background copy thread, if backgroundCopyClass set to best-effort."
},
"blockedLocalAgent": {
"type": "boolean",
"default": true,
"description": "Used DataBuffer blocks to return agent data in localAgent mode."
},
"callbackRetries": {
"type": "integer",
"default": 3,
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ extern IPropertyTree *topology;
extern MapStringTo<int> *preferredClusters;
extern StringArray allQuerySetNames;

extern bool blockedLocalAgent;
extern bool acknowledgeAllRequests;
extern unsigned packetAcknowledgeTimeout;
extern bool alwaysTrustFormatCrcs;
Expand Down
17 changes: 15 additions & 2 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ unsigned numServerThreads = 30;
unsigned numAgentThreads = 30;
bool prestartAgentThreads = false;
unsigned numRequestArrayThreads = 5;
bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
unsigned headRegionSize;
Expand Down Expand Up @@ -757,8 +758,19 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
else
setStatisticsComponentName(SCTroxie, "roxie", true);
#ifdef _CONTAINERIZED
getDefaultStoragePlane(defaultPlane);
getDefaultIndexBuildStoragePlane(defaultIndexBuildPlane);
try
{
getDefaultStoragePlane(defaultPlane);
getDefaultIndexBuildStoragePlane(defaultIndexBuildPlane);
}
catch (IException *E)
{
#ifdef _DEBUG
E->Release(); // Useful for some local testing to be able to ignore these configuration errors
#else
throw;
#endif
}
#endif
installDefaultFileHooks(topology);

Expand Down Expand Up @@ -967,6 +979,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
}

minPayloadSize = topology->getPropInt("@minPayloadSize", minPayloadSize);
blockedLocalAgent = topology->getPropBool("@blockedLocalAgent", blockedLocalAgent);
acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests);
headRegionSize = topology->getPropInt("@headRegionSize", 0);
packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout);
Expand Down
Loading

0 comments on commit bcb0131

Please sign in to comment.