Skip to content

Commit

Permalink
HPCC-31291 Add support for Thor Sasha QMon to k8s
Browse files Browse the repository at this point in the history
Sasha queue monitoring was missing from the containerized
platform and configuration.
Add the ability to configure and run this via helm charts.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Feb 19, 2024
1 parent 179d2e9 commit 2cb0fe4
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 23 deletions.
1 change: 1 addition & 0 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9877,6 +9877,7 @@ bool CLocalWorkUnit::switchThorQueue(const char *newCluster, IQueueSwitcher *qs,
item = tmpItem;
}

PROGLOG("Workunit '%s' switched from Thor queue '%s' to '%s'", p->queryName(), curqname.str(), newqname.str());
setClusterName(newCluster);

bool res = false;
Expand Down
72 changes: 54 additions & 18 deletions dali/sasha/saqmon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
#include "workunit.hpp"
#include "wujobq.hpp"
#include "environment.hpp"
#include "jconfig.hpp"

#ifndef _CONTAINERIZED
//not currently created or used in the containerized version

//#define TESTING
Expand Down Expand Up @@ -71,36 +71,58 @@ class CSashaQMonitorServer: public ISashaServer, public Thread
{
if (!qlist||!*qlist)
return false;
if (!qinitdone) {
if (!qinitdone)
{
qinitdone = true;
StringArray qs;
qs.appendListUniq(qlist, ",");
if (!qs.ordinality())
return false;
StringArray tna;
#ifdef _CONTAINERIZED
Owned<IStringIterator> cnaIter = config::getContainerTargets("thor", nullptr);
if (!cnaIter->first())
return false;
while (true)
{
SCMStringBuffer target;
cnaIter->str(target);
tna.append(target.str());
if (!cnaIter->next())
break;
}
#else
StringArray cna;
StringArray gna;
StringArray tna;
StringArray qna;
if (getEnvironmentThorClusterNames(cna,gna,tna,qna)==0)
return false;
ForEachItemIn(i1,tna) {
#endif
ForEachItemIn(i1,tna)
{
const char *qname = tna.item(i1); // JCSMORE - ThorQMon/@queues is actually matching targets, rename property to @targets ?
bool ok = false;
ForEachItemIn(i2,qs) {
if (WildMatch(qname,qs.item(i2),true)) {
ForEachItemIn(i2,qs)
{
if (WildMatch(qname,qs.item(i2),true))
{
ok = true;
break;
}
}
if (ok) {
if (ok)
{
// see if already done
ForEachItemIn(i2,qnames) {
if (strcmp(qname,qnames.item(i2))==0) {
ForEachItemIn(i2,qnames)
{
if (strcmp(qname,qnames.item(i2))==0)
{
ok = false;
break;
}
}
if (ok) {
if (ok)
{
qnames.append(qname);
cnames.append(tna.item(i1));
}
Expand Down Expand Up @@ -219,14 +241,30 @@ class CSashaQMonitorServer: public ISashaServer, public Thread

int run()
{
Owned<IPropertyTree> qmonprops = serverConfig->getPropTree("ThorQMon");
if (!qmonprops)
qmonprops.setown(createPTree("ThorQMon"));
Owned<IPropertyTree> qmonprops;
if (isContainerized())
qmonprops.set(serverConfig);
else
{
qmonprops.setown(serverConfig->getPropTree("ThorQMon"));
if (!qmonprops)
qmonprops.setown(createPTree("ThorQMon"));
}
unsigned interval = qmonprops->getPropInt("@interval",DEFAULT_QMONITOR_INTERVAL); // probably always 1
unsigned autoswitch = qmonprops->getPropInt("@switchMinTime",0);
if (!interval)
return 0;
if (!initQueueNames(qmonprops->queryProp("@queues")))

// In bare-metal, historically autoswitching has been disabled by default.
// However, it is usually enabled in the environment.xml in most deployments.
// To simplify the containerized configuration, we set autoswitching period to match the interval by default.
unsigned autoSwitchDefault = isContainerized() ? interval : 0;

unsigned autoswitch = qmonprops->getPropInt("@switchMinTime", autoSwitchDefault);

const char *configQueues = qmonprops->queryProp("@queues");
if (!configQueues && isContainerized())
configQueues = "*"; // NB: this is the bare-metal default too (from stock environment.xml)
if (!initQueueNames(configQueues))
return 0;
Owned<IRemoteConnection> conn = querySDS().connect("Status/Servers", myProcessSession(), 0, 100000);
if (!conn)
Expand All @@ -238,7 +276,7 @@ class CSashaQMonitorServer: public ISashaServer, public Thread
ForEachItemIn(i1,qnames)
{
StringBuffer qname(qnames.item(i1));
qname.append(".thor");
qname.append(THOR_QUEUE_EXT);
queues.append(*createJobQueue(qname.str()));
qidlecount[i1] = 0;
}
Expand Down Expand Up @@ -324,5 +362,3 @@ ISashaServer *createSashaQMonitorServer()
sashaQMonitorServer = new CSashaQMonitorServer();
return sashaQMonitorServer;
}

#endif // !_CONTAINERIZED
2 changes: 2 additions & 0 deletions dali/sasha/saserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ int main(int argc, const char* argv[])
servers.append(*createSashaCachedWURemoverServer());
else if (strieq(service, "file-expiry"))
servers.append(*createSashaFileExpiryServer());
else if (strieq(service, "thor-qmon"))
servers.append(*createSashaQMonitorServer());
//else if (strieq(service, "xref")) // TODO
// servers.append(*createSashaXrefServer());
else
Expand Down
6 changes: 6 additions & 0 deletions helm/hpcc/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,10 @@ data:
{{- $sashaStoragePlane := .me.plane | default (include "hpcc.getFirstPlaneForCategory" (dict "root" .root "category" "sasha")) }}
{{- $_ := set .me "plane" $sashaStoragePlane }}
storagePath: {{ include "hpcc.getPlanePrefix" (dict "root" .root "planeName" $sashaStoragePlane) }}
{{- end }}
{{- if (has "queues" .me.access) }}
queues:
{{ include "hpcc.generateConfigMapQueues" .root | indent 6 }}
{{- end }}
global:
{{ include "hpcc.generateGlobalConfigMap" .root | indent 6 }}
Expand Down Expand Up @@ -1576,6 +1580,8 @@ dali
dali
{{- else if (eq "file-expiry" .name) -}}
dali data
{{- else if (eq "thor-qmon" .name) -}}
dali queues
{{- else -}}
{{- $_ := fail (printf "Unknown sasha service:" .name ) -}}
{{- end -}}
Expand Down
29 changes: 29 additions & 0 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2854,6 +2854,32 @@
},
"additionalProperties": false
},
"sasha-thor-qmon" : {
"type": "object",
"allOf": [{ "$ref": "#/definitions/sashacommon" }],
"properties": {
"queues": {
"description": "A list of queues to monitor",
"type": "array",
"items": { "type": "string" },
"default": "*"
},
"switchMinTime": {
"type": "integer",
"description": "Minimum time (in seconds) to wait before switching to a different queue",
"default": "0"
},
"disabled": {},
"hold": {},
"interval": {},
"image": {},
"resources": {},
"annotations": {},
"labels": {},
"egress": {}
},
"additionalProperties": false
},
"sashaservice": {
"oneOf": [
{
Expand All @@ -2875,6 +2901,9 @@
"file-expiry": {
"$ref": "#/definitions/sasha-file-expiry"
},
"thor-qmon": {
"$ref": "#/definitions/sasha-thor-qmon"
},
"disabled": {
"type": "boolean"
}
Expand Down
6 changes: 6 additions & 0 deletions helm/hpcc/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,12 @@ sasha:
#expiryDefault: 4
#user: sasha

thor-qmon: {} # NB: no properties defined, use defaults
#disabled: true
#switchMinTime: 1
#queues: "*"


dfuserver:
- name: dfuserver
maxJobs: 1
Expand Down
15 changes: 10 additions & 5 deletions thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "daclient.hpp"
#include "dadfs.hpp"
#include "dalienv.hpp"
#include "daqueue.hpp"
#include "dasds.hpp"
#include "dllserver.hpp"
#include "workunit.hpp"
Expand Down Expand Up @@ -960,13 +961,17 @@ int main( int argc, const char *argv[] )
bool workerNSInstalled = false;
bool workerJobInstalled = false;

#ifndef _CONTAINERIZED
SCMStringBuffer _queueNames;
const char *thorName = globals->queryProp("@name");
if (!thorName) thorName = "thor";
getThorQueueNames(_queueNames, thorName);
queueName.set(_queueNames.str());
#ifdef _CONTAINERIZED
StringBuffer queueNames;
getClusterThorQueueName(queueNames, thorName);
#else
if (!thorName)
thorName = "thor";
SCMStringBuffer queueNames;
getThorQueueNames(queueNames, thorName);
#endif
queueName.set(queueNames.str());

Owned<IException> exception;
try
Expand Down

0 comments on commit 2cb0fe4

Please sign in to comment.