diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index b818edc53d9..5f4a2c13f2a 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -304,6 +304,7 @@ extern StringArray allQuerySetNames; extern bool blockedLocalAgent; extern bool acknowledgeAllRequests; extern unsigned packetAcknowledgeTimeout; +extern unsigned dynPriorityAdjustTime; extern bool alwaysTrustFormatCrcs; extern bool allFilesDynamic; extern bool lockSuperFiles; @@ -333,8 +334,8 @@ extern unsigned memoryStatsInterval; extern unsigned pingInterval; extern unsigned socketCheckInterval; extern memsize_t defaultMemoryLimit; -extern unsigned defaultTimeLimit[4]; -extern unsigned defaultWarnTimeLimit[4]; +extern unsigned defaultTimeLimit[3]; +extern unsigned defaultWarnTimeLimit[3]; extern unsigned defaultThorConnectTimeout; extern bool pretendAllOpt; extern ClientCertificate clientCert; diff --git a/roxie/ccd/ccdcontext.cpp b/roxie/ccd/ccdcontext.cpp index e064a6552cb..0ec12cec04a 100644 --- a/roxie/ccd/ccdcontext.cpp +++ b/roxie/ccd/ccdcontext.cpp @@ -1530,6 +1530,11 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext return options; } + virtual unsigned __int64 queryElapsedNS() const + { + return nsTick() - (1000000ULL * startTime); + } + const char *queryAuthToken() { return authToken.str(); diff --git a/roxie/ccd/ccdcontext.hpp b/roxie/ccd/ccdcontext.hpp index a9968029a03..fd39a151b84 100644 --- a/roxie/ccd/ccdcontext.hpp +++ b/roxie/ccd/ccdcontext.hpp @@ -54,6 +54,7 @@ interface IRoxieAgentContext : extends IRoxieContextLogger virtual void noteChildGraph(unsigned id, IActivityGraph *childGraph) = 0; virtual roxiemem::IRowManager &queryRowManager() = 0; virtual const QueryOptions &queryOptions() const = 0; + virtual unsigned __int64 queryElapsedNS() const = 0; virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) = 0; virtual const char *queryAuthToken() = 0; virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt, bool isPrivilegedUser) = 0; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 3d4262b6922..8b82a243c18 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -75,6 +75,7 @@ unsigned numRequestArrayThreads = 5; bool blockedLocalAgent = true; bool acknowledgeAllRequests = true; unsigned packetAcknowledgeTimeout = 100; +unsigned dynPriorityAdjustTime = 0; // default off (0) unsigned headRegionSize; unsigned ccdMulticastPort; bool enableHeartBeat = true; @@ -164,8 +165,8 @@ int backgroundCopyPrio = 0; unsigned memoryStatsInterval = 0; memsize_t defaultMemoryLimit; -unsigned defaultTimeLimit[4] = {0, 0, 0, 0}; -unsigned defaultWarnTimeLimit[4] = {0, 5000, 5000, 10000}; +unsigned defaultTimeLimit[3] = {0, 0, 0}; +unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000}; unsigned defaultThorConnectTimeout; unsigned defaultParallelJoinPreload = 0; @@ -1007,6 +1008,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests); headRegionSize = topology->getPropInt("@headRegionSize", 0); packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout); + dynPriorityAdjustTime = topology->getPropInt("@dynPriorityAdjustTime", 0); ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600); roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0)); @@ -1169,11 +1171,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) defaultTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeLimit", 0); defaultTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeLimit", 0); defaultTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeLimit", 0); - defaultTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeLimit", 0); defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0); defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0); defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0); - defaultWarnTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeWarning", 0); defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60); continuationCompressThreshold = (unsigned) topology->getPropInt64("@continuationCompressThreshold", 1024); diff --git a/roxie/ccd/ccdquery.cpp b/roxie/ccd/ccdquery.cpp index 2ec6239e245..85bd548771f 100644 --- a/roxie/ccd/ccdquery.cpp +++ b/roxie/ccd/ccdquery.cpp @@ -319,6 +319,7 @@ class CSharedOnceContext : public CInterfaceOf QueryOptions::QueryOptions() { priority = 0; + dynPriority = 0; timeLimit = defaultTimeLimit[0]; warnTimeLimit = defaultWarnTimeLimit[0]; @@ -358,6 +359,7 @@ QueryOptions::QueryOptions() QueryOptions::QueryOptions(const QueryOptions &other) { priority = other.priority; + dynPriority = other.dynPriority; timeLimit = other.timeLimit; warnTimeLimit = other.warnTimeLimit; @@ -400,10 +402,12 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat updateFromWorkUnit(priority, wu, "priority"); if (stateInfo) updateFromContext(priority, stateInfo, "@priority"); + dynPriority = priority; if ((int)priority < 0) { - timeLimit = defaultTimeLimit[3]; - warnTimeLimit = defaultWarnTimeLimit[3]; + // use LOW queue time limits ... + timeLimit = defaultTimeLimit[0]; + warnTimeLimit = defaultWarnTimeLimit[0]; } else { @@ -494,6 +498,31 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx) if (ctx) { // Note: priority cannot be set at context level + // b/c this is after activities have been created, but we could + // dynamically adj priority in the header activityId before sending + int tmpPriority; + updateFromContext(tmpPriority, ctx, "@priority", "_Priority"); + if (tmpPriority > 1) + tmpPriority = 1; + if (tmpPriority < -1) + tmpPriority = -1; + + if (tmpPriority < (int)priority) + { + dynPriority = tmpPriority; + if (dynPriority < 0) + { + // use LOW queue time limits ... + timeLimit = defaultTimeLimit[0]; + warnTimeLimit = defaultWarnTimeLimit[0]; + } + else + { + timeLimit = defaultTimeLimit[dynPriority]; + warnTimeLimit = defaultWarnTimeLimit[dynPriority]; + } + } + updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit"); updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit"); updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit"); diff --git a/roxie/ccd/ccdquery.hpp b/roxie/ccd/ccdquery.hpp index f4f45d12fd4..3dbf694c84a 100644 --- a/roxie/ccd/ccdquery.hpp +++ b/roxie/ccd/ccdquery.hpp @@ -80,8 +80,8 @@ class QueryOptions void setFromContext(const IPropertyTree *ctx); void setFromAgentLoggingFlags(unsigned loggingFlags); - unsigned priority; + int dynPriority; unsigned timeLimit; unsigned warnTimeLimit; unsigned traceLimit; diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index a79264cf54d..9023203d138 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -1936,7 +1936,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.start(); hiQueue.start(); slaQueue.start(); - bgQueue.start(); + bgQueue.start(); // consider nice(+3) BG threads } virtual void stop() diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index b4e3cd3f051..fa3c46b0b2d 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -316,6 +316,10 @@ class IndirectAgentContext : implements IRoxieAgentContext, public CInterface { return ctx->queryOptions(); } + virtual unsigned __int64 queryElapsedNS() const override + { + return ctx->queryElapsedNS(); + } virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) override { ctx->addAgentsReplyLen(len, duplicates, resends); @@ -4557,9 +4561,37 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie // But this could still cause too many reply packets on the fastlane // (higher priority output Q), which may cause the activities on the // low priority output Q to not get service on time. + + int origPriority = (int)ctx->queryOptions().priority; + int dynPriority = ctx->queryOptions().dynPriority; + if (dynPriority < origPriority) + { + unsigned newPri = ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY; + switch (dynPriority) + { + case 1: + newPri = ROXIE_HIGH_PRIORITY; + break; + case 0: + newPri = ROXIE_LOW_PRIORITY; + break; + } + p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK; + p->queryHeader().activityId |= newPri; + } + + if ( (dynPriorityAdjustTime > 0) && (origPriority == 0) && (dynPriority == 0) && + (ctx->queryElapsedNS()/1000000ULL > dynPriorityAdjustTime) ) + { + // TODO: should we update options.dynPriority or otherwise so we don't + // have to calculate elapsed every time ? + p->queryHeader().activityId |= (ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY); + // TODO: what to do about still running activities' continuation/ack priorities ? + } + unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK; if ((colocalArg == 0) && // not a child query activity?? - (pmask && (pmask != ROXIE_PRIORITY_MASK)) && + (pmask && (pmask != (ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY))) && (p->queryHeader().overflowSequence == 0) && (p->queryHeader().continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)==0) p->queryHeader().retries |= ROXIE_FASTLANE; @@ -5014,7 +5046,18 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie mu.clear(); SimpleActivityTimer t(unpackerWaitCycles, timeActivities); unsigned ctxTraceLevel = activity.queryLogCtx().queryTraceLevel(); - unsigned timeout = remoteId.isSLAPriority() ? slaTimeout : (remoteId.isHighPriority() ? highTimeout : lowTimeout); + + unsigned timeout = lowTimeout; + switch (activity.queryContext()->queryOptions().dynPriority) + { + case 2: + timeout = slaTimeout; + break; + case 1: + timeout = highTimeout; + break; + } + unsigned checkInterval = activity.queryContext()->checkInterval(); if (checkInterval > timeout) checkInterval = timeout; diff --git a/roxie/ccd/ccdstate.cpp b/roxie/ccd/ccdstate.cpp index 2108347a486..1b239971d9c 100644 --- a/roxie/ccd/ccdstate.cpp +++ b/roxie/ccd/ccdstate.cpp @@ -2439,16 +2439,6 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme defaultWarnTimeLimit[2] = control->getPropInt("@limit", 0); topology->setPropInt("@defaultSLAPriorityTimeWarning", defaultWarnTimeLimit[2]); } - else if (stricmp(queryName, "control:defaultBGPriorityTimeLimit")==0) - { - defaultTimeLimit[3] = control->getPropInt("@limit", 0); - topology->setPropInt("@defaultBGPriorityTimeLimit", defaultTimeLimit[3]); - } - else if (stricmp(queryName, "control:defaultBGPriorityTimeWarning")==0) - { - defaultWarnTimeLimit[3] = control->getPropInt("@limit", 0); - topology->setPropInt("@defaultBGPriorityTimeWarning", defaultWarnTimeLimit[3]); - } else if (stricmp(queryName, "control:deleteUnneededPhysicalFiles")==0) { UNIMPLEMENTED;