Skip to content

Commit

Permalink
Merge pull request #17715 from jakesmith/HPCC-29673-capture-k8sinfo
Browse files Browse the repository at this point in the history
HPCC-29673 Capture job pod names and publish to workunit

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Aug 30, 2023
2 parents 730326e + 1b816ab commit cbbe2dd
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 11 deletions.
49 changes: 49 additions & 0 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4462,6 +4462,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
{ return c->createException(); }
virtual void addProcess(const char *type, const char *instance, unsigned pid, unsigned max, const char *pattern, bool singleLog, const char *log)
{ c->addProcess(type, instance, pid, max, pattern, singleLog, log); }
virtual bool setContainerizedProcessInfo(const char *type, const char *instance, const char *podName, const char *sequence)
{ return c->setContainerizedProcessInfo(type, instance, podName, sequence); }
virtual void protect(bool protectMode)
{ c->protect(protectMode); }
virtual void setAction(WUAction action)
Expand Down Expand Up @@ -5944,8 +5946,26 @@ void CWorkUnitFactory::reportAbnormalTermination(const char *wuid, WUState &stat

static CriticalSection deleteDllLock;
static IWorkQueueThread *deleteDllWorkQ = nullptr;
static unsigned podInfoInitCBId = 0;
static StringBuffer myPodName;

const char *queryMyPodName()
{
return myPodName;
}

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
auto updateFunc = [&](const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration)
{
if (myPodName.length()) // called at config load time, and never needs to be refreshed
return;
// process pod information from environment
getEnvVar("MY_POD_NAME", myPodName.clear());
PROGLOG("The podName = %s", myPodName.str());
};
if (isContainerized())
podInfoInitCBId = installConfigUpdateHook(updateFunc, true);
return true;
}
MODULE_EXIT()
Expand All @@ -5954,6 +5974,8 @@ MODULE_EXIT()
if (deleteDllWorkQ)
::Release(deleteDllWorkQ);
deleteDllWorkQ = nullptr;

removeConfigUpdateHook(podInfoInitCBId);
}
static void asyncRemoveDll(const char * name)
{
Expand Down Expand Up @@ -8639,6 +8661,33 @@ void CLocalWorkUnit::addProcess(const char *type, const char *instance, unsigned
}
}

bool CLocalWorkUnit::setContainerizedProcessInfo(const char *type, const char *instance, const char *podName, const char *sequence)
{
VStringBuffer processType("Process/%s", type);
CriticalBlock block(crit);
IPropertyTree *node = p->queryPropTree(processType);
if (!node)
node = ensurePTree(p, processType);
StringBuffer instanceXPath(instance);
if (sequence)
instanceXPath.appendf("[@sequence='%s']", sequence);

VStringBuffer instancePodXPath("%s[@podName='%s']", instanceXPath.str(), podName);
IPropertyTree *instanceNode = node->queryPropTree(instancePodXPath);
if (instanceNode)
return false;

// NB: instanceNum represents separate instances of {type,instance,sequence}
// e.g. if there are multiple Thor instances, each will have a distinct "instanceNum"
unsigned instanceNum = node->getCount(instanceXPath)+1;
instanceNode = node->addPropTree(instance);
if (sequence)
instanceNode->setProp("@sequence", sequence); // instance specific, e.g. worker #
instanceNode->setPropInt("@instanceNum", instanceNum);
instanceNode->setProp("@podName", podName);
return true;
}

void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool overwrite)
{
StringBuffer lower;
Expand Down
3 changes: 3 additions & 0 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,7 @@ interface IWorkUnit : extends IConstWorkUnit
virtual void commit() = 0;
virtual IWUException * createException() = 0;
virtual void addProcess(const char *type, const char *instance, unsigned pid, unsigned max, const char *pattern, bool singleLog, const char *log=nullptr) = 0;
virtual bool setContainerizedProcessInfo(const char *type, const char *instance, const char *podName, const char *sequence) = 0;
virtual void setAction(WUAction action) = 0;
virtual void setApplicationValue(const char * application, const char * propname, const char * value, bool overwrite) = 0;
virtual void setApplicationValueInt(const char * application, const char * propname, int value, bool overwrite) = 0;
Expand Down Expand Up @@ -1787,6 +1788,8 @@ extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid,
// returns a vector of {pod-name, node-name} vectors,
extern WORKUNIT_API std::vector<std::vector<std::string>> getPodNodes(const char *selector);

extern WORKUNIT_API const char *queryMyPodName();

extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::initializer_list<TraceOption> & y, TraceFlags dft);


Expand Down
1 change: 1 addition & 0 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ public:
void commit();
IWUException *createException();
void addProcess(const char *type, const char *instance, unsigned pid, unsigned max, const char *pattern, bool singleLog, const char *log);
bool setContainerizedProcessInfo(const char *type, const char *instance, const char *podName, const char *sequence);
void setAction(WUAction action);
void setApplicationValue(const char * application, const char * propname, const char * value, bool overwrite);
void setApplicationValueInt(const char * application, const char * propname, int value, bool overwrite);
Expand Down
8 changes: 8 additions & 0 deletions ecl/agentexec/agentexec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,20 @@ class WaitThread : public CInterfaceOf<IPooledThread>
{
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
if (isContainerized())
workunit->setContainerizedProcessInfo("AgentExec", compConfig->queryProp("@name"), queryMyPodName(), nullptr);
addTimeStamp(workunit, wfid, graphName, StWhenK8sLaunched);
}
runK8sJob(jobSpecName, wuid, jobName, params);
}
else
{
if (isContainerized())
{
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid.str());
workunit->setContainerizedProcessInfo("AgentExec", compConfig->queryProp("@name"), queryMyPodName(), nullptr);
}
bool useValgrind = compConfig->getPropBool("@valgrind", false);
VStringBuffer exec("%s%s --workunit=%s --daliServers=%s", useValgrind ? "valgrind " : "", processName.get(), wuid.str(), dali.str());
if (compConfig->hasProp("@config"))
Expand Down
7 changes: 6 additions & 1 deletion ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,12 @@ void EclAgent::doProcess()
traceLevel = w->getDebugValueInt("traceLevel", 10);
w->setTracingValue("EclAgentBuild", hpccBuildInfo.buildTag);
if (agentTopology->hasProp("@name"))
w->addProcess("EclAgent", agentTopology->queryProp("@name"), GetCurrentProcessId(), 0, nullptr, false, logname.str());
{
if (isContainerized())
w->setContainerizedProcessInfo("EclAgent", agentTopology->queryProp("@name"), queryMyPodName(), nullptr);
else
w->addProcess("EclAgent", agentTopology->queryProp("@name"), GetCurrentProcessId(), 0, nullptr, false, logname.str());
}

eclccCodeVersion = w->getCodeVersion();
if (eclccCodeVersion == 0)
Expand Down
13 changes: 6 additions & 7 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1563,15 +1563,14 @@ void EclAgent::updateWULogfile(IWorkUnit *outputWU)
rlf.setLocalPath(logname);
rlf.getRemotePath(logname.clear());

if (outputWU)
Owned<IWorkUnit> w;
if (!outputWU)
{
outputWU->addProcess("EclAgent", agentTopology->queryProp("@name"), GetCurrentProcessId(), 0, nullptr, false, logname.str());
}
else
{
Owned<IWorkUnit> w = updateWorkUnit();
w->addProcess("EclAgent", agentTopology->queryProp("@name"), GetCurrentProcessId(), 0, nullptr, false, logname.str());
w.setown(updateWorkUnit());
outputWU = w;
}

outputWU->addProcess("EclAgent", agentTopology->queryProp("@name"), GetCurrentProcessId(), 0, nullptr, false, logname.str());
}
else
{
Expand Down
9 changes: 9 additions & 0 deletions ecl/eclccserver/eclccserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,11 @@ class EclccCompileThread : implements IPooledThread, implements IErrorReporter,
{
if (!useChildProcesses && !childProcessTimeLimit && !config->hasProp("@workunit"))
{
{
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IWorkUnit> wu = factory->updateWorkUnit(wuid.get());
wu->setContainerizedProcessInfo("EclCCServer", getComponentConfigSP()->queryProp("@name"), queryMyPodName(), nullptr);
}
compileViaK8sJob(true);
return;
}
Expand All @@ -905,6 +910,10 @@ class EclccCompileThread : implements IPooledThread, implements IErrorReporter,
DBGLOG("Workunit %s no longer exists", wuid.get());
return;
}

if (isContainerized())
workunit->setContainerizedProcessInfo("EclCC", getComponentConfigSP()->queryProp("@name"), queryMyPodName(), nullptr);

if (workunit->aborting() || workunit->getState()==WUStateAborted)
{
workunit->setState(WUStateAborted);
Expand Down
24 changes: 23 additions & 1 deletion helm/hpcc/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,7 @@ Pass in dict with root, me and dali if container in dali pod
{{- $serviceName := printf "sasha-%s" .me.name }}
{{- $overrideDaliHost := .overrideDaliHost | default "" }}
{{- $overrideDaliPort := .overrideDaliPort | default 0 }}
{{- $env := concat (.root.Values.global.env | default list) (.env | default list) }}
- name: {{ $serviceName | quote }}
workingDir: /var/lib/HPCCSystems
command: [ saserver ]
Expand All @@ -1241,6 +1242,7 @@ Pass in dict with root, me and dali if container in dali pod
{{- include "hpcc.addResources" (dict "me" .me.resources) | indent 2 }}
{{- include "hpcc.addSecurityContext" . | indent 2 }}
env:
{{ include "hpcc.mergeEnvironments" $env | indent 2 -}}
- name: "SENTINEL"
value: "/tmp/{{ $serviceName }}.sentinel"
{{- with (dict "name" $serviceName) }}
Expand Down Expand Up @@ -2138,7 +2140,27 @@ A template to output a merged environment. Pass in a list with global then local
- name: {{ $key | quote }}
value: {{ $value | quote }}
{{ end -}}
{{- end -}}
- name: MY_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: MY_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: MY_POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: MY_POD_SERVICE_ACCOUNT
valueFrom:
fieldRef:
fieldPath: spec.serviceAccountName
{{ end -}}


{{/*
Expand Down
50 changes: 49 additions & 1 deletion thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
limitations under the License.
############################################################################## */

#include <future>
#include <chrono>
#include <future>
#include <string>
#include <unordered_set>

#include "platform.h"
#include <math.h>
Expand Down Expand Up @@ -1280,6 +1282,28 @@ static int recvNextGraph(unsigned timeoutMs, const char *wuid, StringBuffer &ret
return 1; // success
}


static std::vector<std::string> connectedWorkerPods;
void addConnectedWorkerPod(const char *podName)
{
connectedWorkerPods.push_back(podName);
}

static bool podInfoPublished = false;
void publishPodNames(IWorkUnit *workunit)
{
// skip if Thor manager already published (implying worker pods already published too)
if (workunit->setContainerizedProcessInfo("Thor", globals->queryProp("@name"), queryMyPodName(), nullptr))
{
for (unsigned workerNum=0; workerNum<connectedWorkerPods.size(); workerNum++)
{
const char *workerPodName = connectedWorkerPods[workerNum].c_str();
workunit->setContainerizedProcessInfo("ThorWorker", globals->queryProp("@name"), workerPodName, std::to_string(workerNum+1).c_str());
}
}
podInfoPublished = true;
}

void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphName)
{
aborting = 0;
Expand Down Expand Up @@ -1308,6 +1332,7 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam

enableForceRemoteReads(); // forces file reads to be remote reads if they match environment setting 'forceRemotePattern' pattern.

std::unordered_set<std::string> publishedPodWuids;
Owned<CJobManager> jobManager = new CJobManager(logHandler);
try
{
Expand Down Expand Up @@ -1342,6 +1367,11 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
Owned<IConstWorkUnit> workunit;
factory.setown(getWorkUnitFactory());
workunit.setown(factory->openWorkUnit(currentWuid));
if (!podInfoPublished)
{
Owned<IWorkUnit> wu = &workunit->lock();
publishPodNames(wu);
}
SocketEndpoint dummyAgentEp;
jobManager->execute(workunit, currentWuid, currentGraphName, dummyAgentEp);
IException *e = jobManager->queryExitException();
Expand Down Expand Up @@ -1383,6 +1413,24 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
int ret = recvNextGraph(remaining, currentWuid.str(), wuid, currentGraphName);
if (ret > 0)
{
if (!streq(currentWuid, wuid))
{
// perhaps slightly overkill, but avoid checking/locking wuid to add pod info.
// if this instance has already done so.
auto it = publishedPodWuids.find(wuid.str());
if (it == publishedPodWuids.end())
{
podInfoPublished = false;

// trivial safe-guard against growing too big
// but unlikely to ever grow this big
if (publishedPodWuids.size() > 10000)
publishedPodWuids.clear();

publishedPodWuids.insert(wuid.str());
}
// NB: this set of pods could still already be published, if so, publishPodNames will not re-add.
}
currentWuid.set(wuid); // NB: will always be same if !multiJobLinger
break; // success
}
Expand Down
4 changes: 4 additions & 0 deletions thorlcr/master/thgraphmanager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ void abortThor(IException *e, unsigned errCode, bool abortCurrentJob=true);
void setExitCode(int code);
int queryExitCode();

void addConnectedWorkerPod(const char *podName);
void publishPodNames(IWorkUnit *workunit);


#endif
9 changes: 8 additions & 1 deletion thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,16 @@ class CRegistryServer : public CSimpleInterface
*/
unsigned slaveNum;
msg.read(slaveNum);
StringBuffer slavePodName;
if (NotFound == slaveNum)
{
connectedSlaves.append(sender.getLink());
slaveNum = connectedSlaves.ordinality();
if (isContainerized())
{
msg.read(slavePodName);
addConnectedWorkerPod(slavePodName); // NB: these are added in worker # order
}
}
else
{
Expand All @@ -313,7 +319,7 @@ class CRegistryServer : public CSimpleInterface
--remaining;
}
assertex(slaves == connectedSlaves.ordinality());

if (isContainerized())
{
unsigned wfid = globals->getPropInt("@wfid");
Expand All @@ -322,6 +328,7 @@ class CRegistryServer : public CSimpleInterface
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
addTimeStamp(workunit, wfid, graphName, StWhenK8sReady);
publishPodNames(workunit);
}

unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/slave/thslavemain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
Owned<INode> masterNode = createINode(ep);
CMessageBuffer msg;
msg.append(mySlaveNum);
if (isContainerized())
msg.append(queryMyPodName());
queryWorldCommunicator().send(msg, masterNode, MPTAG_THORREGISTRATION);
if (!queryWorldCommunicator().recv(msg, masterNode, MPTAG_THORREGISTRATION))
return false;
Expand Down

0 comments on commit cbbe2dd

Please sign in to comment.