From 46af3a6bafd613829e3ca2da31a0a4fb6426f2ea Mon Sep 17 00:00:00 2001 From: M Kelly Date: Tue, 31 Dec 2024 10:45:13 -0500 Subject: [PATCH] HPCC-32958 Roxie dynamic priority 2 Signed-off-by: M Kelly --- roxie/ccd/ccd.hpp | 1 + roxie/ccd/ccdqueue.cpp | 12 ++++++------ roxie/ccd/ccdserver.cpp | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 5f4a2c13f2a..909e365adad 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -79,6 +79,7 @@ void setMulticastEndpoints(unsigned numChannels); #define ROXIE_HIGH_PRIORITY 0x80000000 // mask in activityId indicating it goes on the fast queue #define ROXIE_LOW_PRIORITY 0x00000000 // mask in activityId indicating it goes on the slow queue (= default) // background priority queue is when both ROXIE_SLA_PRIORITY and ROXIE_HIGH_PRIORITY are set +#define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue #define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY) #define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 9023203d138..5d0d52c0d6c 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -137,7 +137,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const { const IpAddress serverIP = serverId.getIpAddress(); ret.append("activityId="); - switch(activityId & ~ROXIE_PRIORITY_MASK) + switch (activityId & ~ROXIE_PRIORITY_MASK) { case 0: ret.append("IBYTI"); break; case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break; @@ -157,12 +157,12 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const break; } ret.appendf(" uid=" RUIDF " pri=", uid); - switch(activityId & ROXIE_PRIORITY_MASK) + switch (activityId & ROXIE_PRIORITY_MASK) { case ROXIE_SLA_PRIORITY: ret.append("SLA"); break; case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break; case ROXIE_LOW_PRIORITY: ret.append("LOW"); break; - case ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY: ret.append("BG"); break; + case ROXIE_BG_PRIORITY: ret.append("BG"); break; default: ret.append("???"); break; } ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence); @@ -2279,7 +2279,7 @@ class DelayedPacketQueue DBGLOG("No IBYTI received in time for delayed packet %s - enqueuing", header.toString(s).str()); } unsigned __int64 IBYTIdelay = nsTick()-packet->queryEnqueuedTimeStamp(); - switch(header.activityId & ROXIE_PRIORITY_MASK) + switch (header.activityId & ROXIE_PRIORITY_MASK) { case ROXIE_SLA_PRIORITY: slaQueue.enqueue(packet, IBYTIdelay); break; case ROXIE_HIGH_PRIORITY: hiQueue.enqueue(packet, IBYTIdelay); break; @@ -2904,7 +2904,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase StringBuffer s; DBGLOG("Read roxie packet: %s", header.toString(s).str()); } - switch(header.activityId & ROXIE_PRIORITY_MASK) + switch (header.activityId & ROXIE_PRIORITY_MASK) { case ROXIE_SLA_PRIORITY: processMessage(mb, header, slaQueue); break; case ROXIE_HIGH_PRIORITY: processMessage(mb, header, hiQueue); break; @@ -3719,7 +3719,7 @@ class RoxieLocalQueueManager : public RoxieReceiverBase return; // No point sending the retry in localAgent mode } RoxieQueue *targetQueue; - switch(header.activityId & ROXIE_PRIORITY_MASK) + switch (header.activityId & ROXIE_PRIORITY_MASK) { case ROXIE_SLA_PRIORITY: targetQueue = &slaQueue; break; case ROXIE_HIGH_PRIORITY: targetQueue = &hiQueue; break; diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index afd00d04e00..ebd6e38460f 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4592,7 +4592,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK; if ((colocalArg == 0) && // not a child query activity?? - (pmask && (pmask != (ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY))) && + ( (pmask == ROXIE_SLA_PRIORITY) || (pmask == ROXIE_HIGH_PRIORITY) ) && (p->queryHeader().overflowSequence == 0) && (p->queryHeader().continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)==0) p->queryHeader().retries |= ROXIE_FASTLANE;