Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-31290 Fix Sasha Thor QMon switching issues #18302

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4579,8 +4579,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
virtual void deserialize(MemoryBuffer &src)
{ c->deserialize(src); }

virtual bool switchThorQueue(const char *cluster, IQueueSwitcher *qs)
{ return c->switchThorQueue(cluster,qs); }
virtual bool switchThorQueue(const char *cluster, IQueueSwitcher *qs, const char *item)
{ return c->switchThorQueue(cluster, qs, item); }
virtual void setAllowedClusters(const char *value)
{ c->setAllowedClusters(value); }
virtual IStringVal& getAllowedClusters(IStringVal &str) const
Expand Down Expand Up @@ -9852,28 +9852,45 @@ IPropertyTreeIterator & CLocalWorkUnit::getFilesReadIterator() const

//=================================================================================================


bool CLocalWorkUnit::switchThorQueue(const char *cluster, IQueueSwitcher *qs)
// queued Thor jobs take the form : <wfid>/<workunit>/<graphName>
bool CLocalWorkUnit::switchThorQueue(const char *newCluster, IQueueSwitcher *qs, const char *item)
{
CriticalBlock block(crit);
if (qs->isAuto()&&!getAllowAutoQueueSwitch())
return false;

const char * currentcluster = queryClusterName();
const char *wuid = p->queryName();
StringBuffer curqname;
getClusterThorQueueName(curqname, currentcluster);
getClusterThorQueueName(curqname, queryClusterName());
StringBuffer newqname;
getClusterThorQueueName(newqname, newCluster);

void *qi = qs->getQ(curqname.str(),wuid);
if (!qi)
return false;
bool oneItem = !isEmptyString(item);
StringBuffer tmpItem;
if (!oneItem)
{
// All items in a Thor job queue are of the form <wfid>/<workunit>/<graphName>
// When switching items from a queue, and no item has been specified,
// we need to switch all items that belong to the same workunit.
// NB: This scenario happens if switching is invoked at the workunit level by the user/soap call.
// The standard thor queue switching mechanism always names a specific item.
tmpItem.appendf("*/%s/*", p->queryName());
item = tmpItem;
}

setClusterName(cluster);
setClusterName(newCluster);

StringBuffer newqname;
getClusterThorQueueName(newqname, cluster);
qs->putQ(newqname.str(),wuid,qi);
return true;
bool res = false;
while (true)
{
void *qi = qs->getQ(curqname.str(), item);
if (!qi)
break;
qs->putQ(newqname.str(), qi);
res = true;
if (oneItem)
break;
}
return res;
}


Expand Down
4 changes: 2 additions & 2 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ typedef unsigned __int64 __uint64;
interface IQueueSwitcher : extends IInterface
{
virtual void * getQ(const char * qname, const char * wuid) = 0;
virtual void putQ(const char * qname, const char * wuid, void * qitem) = 0;
virtual void putQ(const char * qname, void * qitem) = 0;
virtual bool isAuto() = 0;
};

Expand Down Expand Up @@ -1374,7 +1374,7 @@ interface IWorkUnit : extends IConstWorkUnit
virtual void noteFileRead(IDistributedFile * file) = 0;
virtual void noteFieldUsage(IPropertyTree * file) = 0;
virtual void resetBeforeGeneration() = 0;
virtual bool switchThorQueue(const char * newcluster, IQueueSwitcher * qs) = 0;
virtual bool switchThorQueue(const char * newcluster, IQueueSwitcher * qs, const char *item) = 0;
virtual void setAllowedClusters(const char * value) = 0;
virtual void setAllowAutoQueueSwitch(bool val) = 0;
virtual void setLibraryInformation(const char * name, unsigned interfaceHash, unsigned definitionHash) = 0;
Expand Down
2 changes: 1 addition & 1 deletion common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public:
IWorkUnit &lockRemote(bool commit);
void unlockRemote();
void abort();
bool switchThorQueue(const char *cluster, IQueueSwitcher *qs);
bool switchThorQueue(const char *cluster, IQueueSwitcher *qs, const char *item);
void setAllowedClusters(const char *value);
IStringVal & getAllowedClusters(IStringVal & str) const;
void remoteCheckAccess(IUserDescriptor *user, bool writeaccess) const;
Expand Down
41 changes: 26 additions & 15 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1404,20 +1404,31 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue

IJobQueueItem *dotake(sQueueData &qd,const char *wuid,bool saveitem,bool hasminprio=false,int minprio=0)
{
// will match and remove 1st item with priority >= minprio
StringBuffer path;
IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
if (!item)
return NULL;
if (item->getPropInt("@num",0)<=0)
return NULL; // don't want (old) cached value
if (hasminprio&&(item->getPropInt("@priority")<minprio))
return NULL;
IJobQueueItem *ret = new CJobQueueItem(item);
removeItem(qd,item,saveitem);
unsigned count = qd.root->getPropInt("@count");
assertex(count);
qd.root->setPropInt("@count",count-1);
return ret;
Owned<IPropertyTreeIterator> iter = qd.root->getElements(getItemPath(path,wuid).str());
if (iter->first())
{
while (true)
{
IPropertyTree *item = &iter->query();
if ((item->getPropInt("@num",0) > 0)) // don't want (old) cached value
{
if (!hasminprio || (item->getPropInt("@priority") >= minprio))
{
IJobQueueItem *ret = new CJobQueueItem(item);
removeItem(qd,item,saveitem);
unsigned count = qd.root->getPropInt("@count");
assertex(count);
qd.root->setPropInt("@count",count-1);
return ret;
}
}
if (!iter->next())
break;
}
}
return nullptr;
}

IJobQueueItem *take(sQueueData &qd,const char *wuid)
Expand Down Expand Up @@ -2246,7 +2257,7 @@ extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster)
Owned<IJobQueue> q = createJobQueue(qname);
return q->take(wuid);
}
void putQ(const char * qname, const char * wuid, void * qitem)
void putQ(const char * qname, void * qitem)
{
Owned<IJobQueue> q = createJobQueue(qname);
q->enqueue((IJobQueueItem *)qitem);
Expand All @@ -2257,5 +2268,5 @@ extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster)
}
} switcher;

return wu->switchThorQueue(cluster, &switcher);
return wu->switchThorQueue(cluster, &switcher, nullptr);
}
94 changes: 61 additions & 33 deletions dali/sasha/saqmon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "dasds.hpp"
#include "daaudit.hpp"
#include "daqueue.hpp"
#include "saserver.hpp"
#include "workunit.hpp"
#include "wujobq.hpp"
Expand Down Expand Up @@ -110,7 +111,7 @@ class CSashaQMonitorServer: public ISashaServer, public Thread
}


bool doSwitch(const char *wuid,const char *cluster)
bool doSwitch(const char *item, const char *wuid, const char *cluster)
{
class cQswitcher: public CInterface, implements IQueueSwitcher
{
Expand Down Expand Up @@ -145,7 +146,7 @@ class CSashaQMonitorServer: public ISashaServer, public Thread
return NULL;
return q->take(wuid);
}
void putQ(const char * qname, const char * wuid, void * qitem)
void putQ(const char * qname, void * qitem)
{
IJobQueue *q = findQueue(qname);
if (q)
Expand All @@ -162,7 +163,7 @@ class CSashaQMonitorServer: public ISashaServer, public Thread
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IWorkUnit> wu = factory->updateWorkUnit(wuid);
if (wu)
return wu->switchThorQueue(cluster, &switcher);
return wu->switchThorQueue(cluster, &switcher, item);
return false;
}

Expand All @@ -175,31 +176,37 @@ class CSashaQMonitorServer: public ISashaServer, public Thread

// see if can find candidate on another queue
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
ForEachItemIn(i1,queues) {
if (i1!=qi) {
ForEachItemIn(i1,queues)
{
if (i1!=qi)
{
IJobQueue &srcq = queues.item(i1);
CJobQueueContents qc;
srcq.copyItems(qc);
Owned<IJobQueueIterator> iter = qc.getIterator();
ForEach(*iter) {
ForEach(*iter)
{
const char *wuidGraph = iter->query().queryWUID();
if (!isEmptyString(wuidGraph)) {
const char *sep = strchr(wuidGraph, '/');
StringAttr wuid;
if (sep)
wuid.set(wuidGraph, sep-wuidGraph);
else
wuid.set(wuidGraph);
if (!isEmptyString(wuidGraph))
{
StringArray sArray;
sArray.appendList(wuidGraph, "/");
assertex(3 == sArray.ordinality());
const char *wuid = sArray.item(1);
Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid);
if (wu) {
if (wu)
{
SCMStringBuffer allowedClusters;
if (wu->getAllowedClusters(allowedClusters).length()) {
if (wu->getAllowedClusters(allowedClusters).length())
{
StringArray acs;
acs.appendListUniq(allowedClusters.str(), ",");
bool found = true;
ForEachItemIn(i,acs) {
const char *cn = cnames.item(qi);
ForEachItemIn(i,acs)
{
if (strcmp(cnames.item(qi),acs.item(i))==0)
return doSwitch(wuid,acs.item(i));
return doSwitch(wuidGraph, wuid, acs.item(i));
}
}
}
Expand All @@ -222,36 +229,53 @@ class CSashaQMonitorServer: public ISashaServer, public Thread
if (!initQueueNames(qmonprops->queryProp("@queues")))
return 0;
Owned<IRemoteConnection> conn = querySDS().connect("Status/Servers", myProcessSession(), 0, 100000);
if (!conn) {
if (!conn)
{
OERRLOG("cannot connect to Status/Servers");
return -1;
}
unsigned *qidlecount = new unsigned[qnames.ordinality()];
ForEachItemIn(i1,qnames) {
ForEachItemIn(i1,qnames)
{
StringBuffer qname(qnames.item(i1));
qname.append(".thor");
queues.append(*createJobQueue(qname.str()));
qidlecount[i1] = 0;
}
unsigned sleeptime = autoswitch?1:interval;
unsigned moninter = interval;
while (!stopped) {
while (!stopped)
{
stopsem.wait(60*1000*sleeptime);
if (stopped)
break;
moninter-=sleeptime;
if (autoswitch||(moninter==0)) { // always true at moment
try {
if (autoswitch||(moninter==0)) // always true at moment
{
try
{
conn->reload();
ForEachItemIn(qi,qnames) {
ForEachItemIn(qi,qnames)
{
StringBuffer thorQName;
const char *qname = qnames.item(qi);
StringBuffer xpath;
xpath.appendf("Server[@queue=\"%s.thor\"]/WorkUnit",qname);
Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements(xpath.str());
getClusterThorQueueName(thorQName, qname);
Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements("Server[@queue]");
Copy link
Member

@ghalliday ghalliday Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@queue looks a bit strange. Does that check that it has an entry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it's a qualifier without a result. Returns all "Server" nodes that have a @Queue attribute.

StringArray wuids;
ForEach(*iter) {
IPropertyTree &wu = iter->query();
wuids.append(wu.queryProp(NULL));
ForEach(*iter)
{
IPropertyTree &server = iter->query();
const char *wuid = server.queryProp("WorkUnit");
if (isEmptyString(wuid))
continue;
const char *queues = server.queryProp("@queue");
if (isEmptyString(queues))
continue;
StringArray queueList;
queueList.appendList(queues, ",");
if (!queueList.contains(thorQName))
continue;
wuids.append(wuid);
}
unsigned enqueued=0;
unsigned connected=0;
Expand All @@ -265,14 +289,18 @@ class CSashaQMonitorServer: public ISashaServer, public Thread
qidlecount[qi] = 0;
}
}
catch (IException *e) {
catch (IException *e)
{
StringBuffer s;
EXCLOG(e, "QMONITOR");
e->Release();
}
if (autoswitch) {
ForEachItemIn(qi2,qnames) {
if (qidlecount[qi2]>autoswitch) { // > not >= to get conservative estimate of how long idle
if (autoswitch)
{
ForEachItemIn(qi2,qnames)
{
if (qidlecount[qi2]>autoswitch) // > not >= to get conservative estimate of how long idle
{
if (switchQueues(qi2))
break; // only switch one per cycle (bit of cop-out)
}
Expand Down
4 changes: 2 additions & 2 deletions ecl/wutest/wujobqtest2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ bool switchWorkunitQueue(const char *wuid, const char *cluster)
Owned<IJobQueue> q = createJobQueue(qname);
return q->take(wuid);
}
void putQ(const char * qname, const char * wuid, void * qitem)
void putQ(const char * qname, void * qitem)
{
Owned<IJobQueue> q = createJobQueue(qname);
q->enqueue((IJobQueueItem *)qitem);
Expand All @@ -56,6 +56,6 @@ bool switchWorkunitQueue(const char *wuid, const char *cluster)
Owned<IWorkUnit> wu = factory->updateWorkUnit(wuid);
if (!wu)
return false;
return wu->switchThorQueue(cluster, &switcher);
return wu->switchThorQueue(cluster, &switcher, nullptr);
}

Loading