From a5baecd92e1121b4a2af3e9671690765eee46fd8 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Thu, 8 Feb 2024 17:13:27 +0000 Subject: [PATCH] HPCC-31290 Fix Sasha Thor QMon switching issues Automatic and manual queue switching by the Sasha QMon service have not worked since 7.12. Changes to the format of the queued thor job items meant it did not find any queued items to swap. This meant that workunits submitted with 'allowedclusters' and 'allowautoqueueswitch', that should have automatically switched to an idle Thor queue in the 'allowedclusters' set when the queue they were submitted to was busy, did not. Also fix the qmon tracing, which was supposed to trace the current workunits in flight running on Thor instances. That appears not to have worked well before 7.12. Signed-off-by: Jake Smith --- common/workunit/workunit.cpp | 47 ++++++++++++------ common/workunit/workunit.hpp | 4 +- common/workunit/workunit.ipp | 2 +- common/workunit/wujobq.cpp | 41 ++++++++++------ dali/sasha/saqmon.cpp | 94 +++++++++++++++++++++++------------- ecl/wutest/wujobqtest2.cpp | 4 +- 6 files changed, 124 insertions(+), 68 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 86010368853..4724828d7b0 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -4579,8 +4579,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa virtual void deserialize(MemoryBuffer &src) { c->deserialize(src); } - virtual bool switchThorQueue(const char *cluster, IQueueSwitcher *qs) - { return c->switchThorQueue(cluster,qs); } + virtual bool switchThorQueue(const char *cluster, IQueueSwitcher *qs, const char *item) + { return c->switchThorQueue(cluster, qs, item); } virtual void setAllowedClusters(const char *value) { c->setAllowedClusters(value); } virtual IStringVal& getAllowedClusters(IStringVal &str) const @@ -9852,28 +9852,45 @@ IPropertyTreeIterator & CLocalWorkUnit::getFilesReadIterator() const //================================================================================================= - -bool CLocalWorkUnit::switchThorQueue(const char *cluster, IQueueSwitcher *qs) +// queued Thor jobs take the form : // +bool CLocalWorkUnit::switchThorQueue(const char *newCluster, IQueueSwitcher *qs, const char *item) { CriticalBlock block(crit); if (qs->isAuto()&&!getAllowAutoQueueSwitch()) return false; - const char * currentcluster = queryClusterName(); - const char *wuid = p->queryName(); StringBuffer curqname; - getClusterThorQueueName(curqname, currentcluster); + getClusterThorQueueName(curqname, queryClusterName()); + StringBuffer newqname; + getClusterThorQueueName(newqname, newCluster); - void *qi = qs->getQ(curqname.str(),wuid); - if (!qi) - return false; + bool oneItem = !isEmptyString(item); + StringBuffer tmpItem; + if (!oneItem) + { + // All items in a Thor job queue are of the form // + // When switching items from a queue, and no item has been specified, + // we need to switch all items that belong to the same workunit. + // NB: This scenario happens if switching is invoked at the workunit level by the user/soap call. + // The standard thor queue switching mechanism always names a specific item. + tmpItem.appendf("*/%s/*", p->queryName()); + item = tmpItem; + } - setClusterName(cluster); + setClusterName(newCluster); - StringBuffer newqname; - getClusterThorQueueName(newqname, cluster); - qs->putQ(newqname.str(),wuid,qi); - return true; + bool res = false; + while (true) + { + void *qi = qs->getQ(curqname.str(), item); + if (!qi) + break; + qs->putQ(newqname.str(), qi); + res = true; + if (oneItem) + break; + } + return res; } diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index a6896a43162..67ba440f36e 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -85,7 +85,7 @@ typedef unsigned __int64 __uint64; interface IQueueSwitcher : extends IInterface { virtual void * getQ(const char * qname, const char * wuid) = 0; - virtual void putQ(const char * qname, const char * wuid, void * qitem) = 0; + virtual void putQ(const char * qname, void * qitem) = 0; virtual bool isAuto() = 0; }; @@ -1374,7 +1374,7 @@ interface IWorkUnit : extends IConstWorkUnit virtual void noteFileRead(IDistributedFile * file) = 0; virtual void noteFieldUsage(IPropertyTree * file) = 0; virtual void resetBeforeGeneration() = 0; - virtual bool switchThorQueue(const char * newcluster, IQueueSwitcher * qs) = 0; + virtual bool switchThorQueue(const char * newcluster, IQueueSwitcher * qs, const char *item) = 0; virtual void setAllowedClusters(const char * value) = 0; virtual void setAllowAutoQueueSwitch(bool val) = 0; virtual void setLibraryInformation(const char * name, unsigned interfaceHash, unsigned definitionHash) = 0; diff --git a/common/workunit/workunit.ipp b/common/workunit/workunit.ipp index d2d7156258d..975f62565e7 100644 --- a/common/workunit/workunit.ipp +++ b/common/workunit/workunit.ipp @@ -385,7 +385,7 @@ public: IWorkUnit &lockRemote(bool commit); void unlockRemote(); void abort(); - bool switchThorQueue(const char *cluster, IQueueSwitcher *qs); + bool switchThorQueue(const char *cluster, IQueueSwitcher *qs, const char *item); void setAllowedClusters(const char *value); IStringVal & getAllowedClusters(IStringVal & str) const; void remoteCheckAccess(IUserDescriptor *user, bool writeaccess) const; diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index 0a5bd0576a1..8470cb154fc 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -1404,20 +1404,31 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue IJobQueueItem *dotake(sQueueData &qd,const char *wuid,bool saveitem,bool hasminprio=false,int minprio=0) { + // will match and remove 1st item with priority >= minprio StringBuffer path; - IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str()); - if (!item) - return NULL; - if (item->getPropInt("@num",0)<=0) - return NULL; // don't want (old) cached value - if (hasminprio&&(item->getPropInt("@priority")getPropInt("@count"); - assertex(count); - qd.root->setPropInt("@count",count-1); - return ret; + Owned iter = qd.root->getElements(getItemPath(path,wuid).str()); + if (iter->first()) + { + while (true) + { + IPropertyTree *item = &iter->query(); + if ((item->getPropInt("@num",0) > 0)) // don't want (old) cached value + { + if (!hasminprio || (item->getPropInt("@priority") >= minprio)) + { + IJobQueueItem *ret = new CJobQueueItem(item); + removeItem(qd,item,saveitem); + unsigned count = qd.root->getPropInt("@count"); + assertex(count); + qd.root->setPropInt("@count",count-1); + return ret; + } + } + if (!iter->next()) + break; + } + } + return nullptr; } IJobQueueItem *take(sQueueData &qd,const char *wuid) @@ -2246,7 +2257,7 @@ extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster) Owned q = createJobQueue(qname); return q->take(wuid); } - void putQ(const char * qname, const char * wuid, void * qitem) + void putQ(const char * qname, void * qitem) { Owned q = createJobQueue(qname); q->enqueue((IJobQueueItem *)qitem); @@ -2257,5 +2268,5 @@ extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster) } } switcher; - return wu->switchThorQueue(cluster, &switcher); + return wu->switchThorQueue(cluster, &switcher, nullptr); } diff --git a/dali/sasha/saqmon.cpp b/dali/sasha/saqmon.cpp index 932e5416b56..db61fc6ea76 100644 --- a/dali/sasha/saqmon.cpp +++ b/dali/sasha/saqmon.cpp @@ -8,6 +8,7 @@ #include "dasds.hpp" #include "daaudit.hpp" +#include "daqueue.hpp" #include "saserver.hpp" #include "workunit.hpp" #include "wujobq.hpp" @@ -110,7 +111,7 @@ class CSashaQMonitorServer: public ISashaServer, public Thread } - bool doSwitch(const char *wuid,const char *cluster) + bool doSwitch(const char *item, const char *wuid, const char *cluster) { class cQswitcher: public CInterface, implements IQueueSwitcher { @@ -145,7 +146,7 @@ class CSashaQMonitorServer: public ISashaServer, public Thread return NULL; return q->take(wuid); } - void putQ(const char * qname, const char * wuid, void * qitem) + void putQ(const char * qname, void * qitem) { IJobQueue *q = findQueue(qname); if (q) @@ -162,7 +163,7 @@ class CSashaQMonitorServer: public ISashaServer, public Thread Owned factory = getWorkUnitFactory(); Owned wu = factory->updateWorkUnit(wuid); if (wu) - return wu->switchThorQueue(cluster, &switcher); + return wu->switchThorQueue(cluster, &switcher, item); return false; } @@ -175,31 +176,37 @@ class CSashaQMonitorServer: public ISashaServer, public Thread // see if can find candidate on another queue Owned factory = getWorkUnitFactory(); - ForEachItemIn(i1,queues) { - if (i1!=qi) { + ForEachItemIn(i1,queues) + { + if (i1!=qi) + { IJobQueue &srcq = queues.item(i1); CJobQueueContents qc; srcq.copyItems(qc); Owned iter = qc.getIterator(); - ForEach(*iter) { + ForEach(*iter) + { const char *wuidGraph = iter->query().queryWUID(); - if (!isEmptyString(wuidGraph)) { - const char *sep = strchr(wuidGraph, '/'); - StringAttr wuid; - if (sep) - wuid.set(wuidGraph, sep-wuidGraph); - else - wuid.set(wuidGraph); + if (!isEmptyString(wuidGraph)) + { + StringArray sArray; + sArray.appendList(wuidGraph, "/"); + assertex(3 == sArray.ordinality()); + const char *wuid = sArray.item(1); Owned wu = factory->openWorkUnit(wuid); - if (wu) { + if (wu) + { SCMStringBuffer allowedClusters; - if (wu->getAllowedClusters(allowedClusters).length()) { + if (wu->getAllowedClusters(allowedClusters).length()) + { StringArray acs; acs.appendListUniq(allowedClusters.str(), ","); bool found = true; - ForEachItemIn(i,acs) { + const char *cn = cnames.item(qi); + ForEachItemIn(i,acs) + { if (strcmp(cnames.item(qi),acs.item(i))==0) - return doSwitch(wuid,acs.item(i)); + return doSwitch(wuidGraph, wuid, acs.item(i)); } } } @@ -222,12 +229,14 @@ class CSashaQMonitorServer: public ISashaServer, public Thread if (!initQueueNames(qmonprops->queryProp("@queues"))) return 0; Owned conn = querySDS().connect("Status/Servers", myProcessSession(), 0, 100000); - if (!conn) { + if (!conn) + { OERRLOG("cannot connect to Status/Servers"); return -1; } unsigned *qidlecount = new unsigned[qnames.ordinality()]; - ForEachItemIn(i1,qnames) { + ForEachItemIn(i1,qnames) + { StringBuffer qname(qnames.item(i1)); qname.append(".thor"); queues.append(*createJobQueue(qname.str())); @@ -235,23 +244,38 @@ class CSashaQMonitorServer: public ISashaServer, public Thread } unsigned sleeptime = autoswitch?1:interval; unsigned moninter = interval; - while (!stopped) { + while (!stopped) + { stopsem.wait(60*1000*sleeptime); if (stopped) break; moninter-=sleeptime; - if (autoswitch||(moninter==0)) { // always true at moment - try { + if (autoswitch||(moninter==0)) // always true at moment + { + try + { conn->reload(); - ForEachItemIn(qi,qnames) { + ForEachItemIn(qi,qnames) + { + StringBuffer thorQName; const char *qname = qnames.item(qi); - StringBuffer xpath; - xpath.appendf("Server[@queue=\"%s.thor\"]/WorkUnit",qname); - Owned iter = conn->queryRoot()->getElements(xpath.str()); + getClusterThorQueueName(thorQName, qname); + Owned iter = conn->queryRoot()->getElements("Server[@queue]"); StringArray wuids; - ForEach(*iter) { - IPropertyTree &wu = iter->query(); - wuids.append(wu.queryProp(NULL)); + ForEach(*iter) + { + IPropertyTree &server = iter->query(); + const char *wuid = server.queryProp("WorkUnit"); + if (isEmptyString(wuid)) + continue; + const char *queues = server.queryProp("@queue"); + if (isEmptyString(queues)) + continue; + StringArray queueList; + queueList.appendList(queues, ","); + if (!queueList.contains(thorQName)) + continue; + wuids.append(wuid); } unsigned enqueued=0; unsigned connected=0; @@ -265,14 +289,18 @@ class CSashaQMonitorServer: public ISashaServer, public Thread qidlecount[qi] = 0; } } - catch (IException *e) { + catch (IException *e) + { StringBuffer s; EXCLOG(e, "QMONITOR"); e->Release(); } - if (autoswitch) { - ForEachItemIn(qi2,qnames) { - if (qidlecount[qi2]>autoswitch) { // > not >= to get conservative estimate of how long idle + if (autoswitch) + { + ForEachItemIn(qi2,qnames) + { + if (qidlecount[qi2]>autoswitch) // > not >= to get conservative estimate of how long idle + { if (switchQueues(qi2)) break; // only switch one per cycle (bit of cop-out) } diff --git a/ecl/wutest/wujobqtest2.cpp b/ecl/wutest/wujobqtest2.cpp index 54db876200e..e8c5e0eecb1 100644 --- a/ecl/wutest/wujobqtest2.cpp +++ b/ecl/wutest/wujobqtest2.cpp @@ -41,7 +41,7 @@ bool switchWorkunitQueue(const char *wuid, const char *cluster) Owned q = createJobQueue(qname); return q->take(wuid); } - void putQ(const char * qname, const char * wuid, void * qitem) + void putQ(const char * qname, void * qitem) { Owned q = createJobQueue(qname); q->enqueue((IJobQueueItem *)qitem); @@ -56,6 +56,6 @@ bool switchWorkunitQueue(const char *wuid, const char *cluster) Owned wu = factory->updateWorkUnit(wuid); if (!wu) return false; - return wu->switchThorQueue(cluster, &switcher); + return wu->switchThorQueue(cluster, &switcher, nullptr); }