Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.0.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	helm/hpcc/templates/dafilesrv.yaml
#	helm/hpcc/templates/dali.yaml
#	helm/hpcc/templates/dfuserver.yaml
#	helm/hpcc/templates/eclagent.yaml
#	helm/hpcc/templates/eclccserver.yaml
#	helm/hpcc/templates/eclscheduler.yaml
#	helm/hpcc/templates/esp.yaml
#	helm/hpcc/templates/localroxie.yaml
#	helm/hpcc/templates/roxie.yaml
#	helm/hpcc/templates/sasha.yaml
#	helm/hpcc/templates/thor.yaml
#	version.cmake
  • Loading branch information
ghalliday committed Mar 30, 2023
2 parents ecd824b + ab89519 commit b6c7273
Show file tree
Hide file tree
Showing 102 changed files with 4,181 additions and 1,163 deletions.
6 changes: 3 additions & 3 deletions common/remote/rmtspawn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void getRemoteSpawnSSH(
}


ISocket * spawnRemoteChild(SpawnKind kind, const char * exe, const SocketEndpoint & childEP, unsigned version, const char *logdir, IAbortRequestCallback * abort, const char *extra)
ISocket *spawnRemoteChild(SpawnKind kind, const char * exe, const SocketEndpoint & childEP, unsigned version, const char *logdir, IAbortRequestCallback * abort, const char *extra, HANDLE *localProcessHandle)
{
SocketEndpoint myEP;
myEP.setLocalHost(0);
Expand Down Expand Up @@ -143,15 +143,15 @@ ISocket * spawnRemoteChild(SpawnKind kind, const char * exe, const SocketEndpoin

#ifdef _CONTAINERIZED
DWORD runcode;
if (!invoke_program(cmd.str(), runcode, false))
if (!invoke_program(cmd.str(), runcode, false, nullptr, localProcessHandle))
throw makeStringExceptionV(-1,"Error spawning %s", exe);
#else
//Run the program directly if it is being run on the local machine - so ssh doesn't need to be running...
//Change once we have solved the problems with ssh etc. on windows?
if (childEP.isLocal())
{
DWORD runcode;
if (!invoke_program(cmd.str(), runcode, false))
if (!invoke_program(cmd.str(), runcode, false, nullptr, localProcessHandle))
throw makeStringExceptionV(-1,"Error spawning %s", exe);
}
else
Expand Down
2 changes: 1 addition & 1 deletion common/remote/rmtspawn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ enum SpawnKind

interface IAbortRequestCallback;

extern REMOTE_API ISocket * spawnRemoteChild(SpawnKind kind, const char * exe, const SocketEndpoint & remoteEP, unsigned version, const char *logdir, IAbortRequestCallback * abort = NULL, const char *extra=NULL);
extern REMOTE_API ISocket *spawnRemoteChild(SpawnKind kind, const char * exe, const SocketEndpoint & remoteEP, unsigned version, const char *logdir, IAbortRequestCallback * abort = nullptr, const char *extra=nullptr, HANDLE *localProcessHandle=nullptr);
extern REMOTE_API void setRemoteSpawnSSH(
const char *identfilename,
const char *username, // if NULL then disable SSH
Expand Down
2 changes: 2 additions & 0 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ inline unsigned getCompMethod(unsigned flags)

inline unsigned getCompMethod(const char *compStr)
{
//Could change to return translateToCompMethod(compStr);
//but would need to extend rw flags to cope with the other variants
unsigned compMethod = COMPRESS_METHOD_LZ4;
if (!isEmptyString(compStr))
{
Expand Down
82 changes: 73 additions & 9 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12276,7 +12276,6 @@ extern WORKUNIT_API void submitWorkUnit(const char *wuid, const char *username,
Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
assertex(cw);
StringAttr clusterName(cw->queryClusterName());
cw.clear();
if (!clusterName.length())
throw MakeStringException(WUERR_InvalidCluster, "No target cluster specified");

Expand All @@ -12287,6 +12286,12 @@ extern WORKUNIT_API void submitWorkUnit(const char *wuid, const char *username,
if (!queue.get())
throw MakeStringException(WUERR_InvalidQueue, "Could not create workunit queue");

{
Owned<IWorkUnit> wu = &cw->lock();
addTimeStamp(wu, SSTcompilestage, "compile", StWhenQueued, 0);
}

cw.clear();
IJobQueueItem *item = createJobQueueItem(wuid);
queue->enqueue(item);
}
Expand Down Expand Up @@ -13883,6 +13888,11 @@ extern WORKUNIT_API void addTimeStamp(IWorkUnit * wu, StatisticScopeType scopeTy
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scopestr, kind, NULL, getTimeStampNowValue(), 1, 0, StatsMergeAppend);
}

extern WORKUNIT_API void addTimeStamp(IWorkUnit * wu, unsigned wfid, const char * scope, StatisticKind kind)
{
addTimeStamp(wu, getScopeType(scope), scope, kind, wfid);
}

static double getCpuSize(const char *resourceName)
{
Owned<IPropertyTree> compConfig = getComponentConfig();
Expand Down Expand Up @@ -14154,6 +14164,9 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP

StringAttr owner(workunit.queryUser());
int priority = workunit.getPriorityValue();

VStringBuffer jobName("%u/%s/%s", wfid, wuid.get(), graphName);

#ifdef _CONTAINERIZED
// NB: If a single agent were to want to launch >1 Thor, then the threading could be in the workflow above this call.

Expand All @@ -14180,8 +14193,13 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
// or an existing idle Thor listening on the same queue will pick it up.
VStringBuffer queueName("%s.thor", config.queryProp("@queue"));
DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);

{
Owned<IWorkUnit> w = &workunit.lock();
addTimeStamp(w, wfid, graphName, StWhenQueued);
}

Owned<IJobQueue> queue = createJobQueue(queueName);
VStringBuffer jobName("%s/%s", wuid.get(), graphName);
IJobQueueItem *item = createJobQueueItem(jobName);
item->setOwner(owner);
item->setPriority(priority);
Expand Down Expand Up @@ -14350,10 +14368,14 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP

pollthread.start();

{
Owned<IWorkUnit> w = &workunit.lock();
addTimeStamp(w, wfid, graphName, StWhenQueued);
}

CCycleTimer elapsedTimer;
PROGLOG("Enqueuing on %s to run wuid=%s, graph=%s, timelimit=%d seconds, priority=%d", queueName.str(), wuid.str(), graphName, timelimit, priority);
VStringBuffer wuidGraph("%s/%s", wuid.str(), graphName);
IJobQueueItem* item = createJobQueueItem(wuidGraph.str());
IJobQueueItem* item = createJobQueueItem(jobName);
item->setOwner(owner.str());
item->setPriority(priority);
Owned<IConversation> conversation = jq->initiateConversation(item);
Expand All @@ -14363,15 +14385,15 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
if (!got)
{
if (pollthread.timedout)
throw MakeStringException(0, "Query %s failed to start within specified timelimit (%u) seconds", wuidGraph.str(), timelimit);
throw MakeStringException(0, "Query %s cancelled (1)", wuidGraph.str());
throw MakeStringException(0, "Query %s failed to start within specified timelimit (%u) seconds", jobName.str(), timelimit);
throw MakeStringException(0, "Query %s cancelled (1)", jobName.str());
}
// get the thor ep from whoever picked up

SocketEndpoint thorMaster;
MemoryBuffer msg;
if (!conversation->recv(msg,1000*60))
throw MakeStringException(0, "Query %s cancelled (2)", wuidGraph.str());
throw MakeStringException(0, "Query %s cancelled (2)", jobName.str());
thorMaster.deserialize(msg);
msg.clear();
SocketEndpoint myep;
Expand All @@ -14389,7 +14411,7 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
}

StringBuffer eps;
PROGLOG("Thor on %s running %s", thorMaster.getUrlStr(eps).str(), wuidGraph.str());
PROGLOG("Thor on %s running %s", thorMaster.getUrlStr(eps).str(), jobName.str());
MemoryBuffer reply;
try
{
Expand Down Expand Up @@ -14420,7 +14442,7 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
if (isException)
{
Owned<IException> e = deserializeException(reply);
VStringBuffer str("Pausing job %s caused exception", wuidGraph.str());
VStringBuffer str("Pausing job %s caused exception", jobName.str());
EXCLOG(e, str.str());
}
Owned<IWorkUnit> w = &workunit.lock();
Expand Down Expand Up @@ -14695,6 +14717,7 @@ bool applyK8sYaml(const char *componentName, const char *wuid, const char *job,
jobYaml.replaceString("_HPCC_JOBNAME_", jobName.str());

VStringBuffer args("\"--workunit=%s\"", wuid);
args.append(" \"--k8sJob=true\"");
for (const auto &p: extraParams)
{
if (hasPrefix(p.first.c_str(), "_HPCC_", false)) // job yaml substitution
Expand Down Expand Up @@ -14814,4 +14837,45 @@ std::vector<std::vector<std::string>> getPodNodes(const char *selector)
}
}

#else
KeepK8sJobs translateKeepJobs(const char *keepJobs)
{
throwUnexpected();
}

bool isActiveK8sService(const char *serviceName)
{
throwUnexpected();
}

bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, const char *graphName)
{
throwUnexpected();
}

void deleteK8sResource(const char *componentName, const char *job, const char *resource)
{
throwUnexpected();
}

void waitK8sJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, KeepK8sJobs keepJob)
{
throwUnexpected();
}

bool applyK8sYaml(const char *componentName, const char *wuid, const char *job, const char *resourceType, const std::list<std::pair<std::string, std::string>> &extraParams, bool optional, bool autoCleanup)
{
throwUnexpected();
}

void runK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams)
{
throwUnexpected();
}

std::vector<std::vector<std::string>> getPodNodes(const char *selector)
{
throwUnexpected();
}

#endif
3 changes: 1 addition & 2 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,7 @@ extern WORKUNIT_API const char * getWorkunitActionStr(WUAction action);
extern WORKUNIT_API WUAction getWorkunitAction(const char * actionStr);

extern WORKUNIT_API void addTimeStamp(IWorkUnit * wu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, unsigned wfid=0);
extern WORKUNIT_API void addTimeStamp(IWorkUnit * wu, unsigned wfid, const char * scope, StatisticKind kind);
extern WORKUNIT_API double getMachineCostRate();
extern WORKUNIT_API double getThorManagerRate();
extern WORKUNIT_API double getThorWorkerRate();
Expand Down Expand Up @@ -1771,7 +1772,6 @@ inline double calcCost(double ratePerHour, unsigned __int64 ms) { return ratePer

extern WORKUNIT_API void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IPropertyTree &config);

#ifdef _CONTAINERIZED
enum class KeepK8sJobs { none, podfailures, all };
extern WORKUNIT_API KeepK8sJobs translateKeepJobs(const char *keepJobs);

Expand All @@ -1784,7 +1784,6 @@ extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid,

// returns a vector of {pod-name, node-name} vectors,
extern WORKUNIT_API std::vector<std::vector<std::string>> getPodNodes(const char *selector);
#endif

extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::initializer_list<TraceOption> & y, TraceFlags dft);

Expand Down
6 changes: 6 additions & 0 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,12 @@ extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *queueName)
if (!queue.get())
throw MakeStringException(-1, "Could not create workunit queue");

{
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IWorkUnit> wu = factory->updateWorkUnit(wuid);
addTimeStamp(wu, SSTglobal, "", StWhenQueued, 0);
}

IJobQueueItem *item = createJobQueueItem(wuid);
queue->enqueue(item);
PROGLOG("Agent request '%s' enqueued on '%s'", wuid, agentQueue.str());
Expand Down
11 changes: 9 additions & 2 deletions dali/ft/daftformat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1884,6 +1884,7 @@ void CRemotePartitioner::prepareCmd(MemoryBuffer &msg)

void CRemotePartitioner::callRemote()
{
HANDLE localFtSlaveHandle = 0; // used only if ftslave is launched on this host
try
{
LogMsgJobInfo job(unknownJob);
Expand All @@ -1901,8 +1902,7 @@ void CRemotePartitioner::callRemote()
if (sprayer.useFtSlave)
{
// NB: In containerized mode, spawnRemoteChild will launch ftslave's locally and set connectEP to localhost
StringBuffer tmp;
socket.setown(spawnRemoteChild(SPAWNdfu, slave, connectEP, DAFT_VERSION, queryFtSlaveLogDir(), nullptr, wuid));
socket.setown(spawnRemoteChild(SPAWNdfu, slave, connectEP, DAFT_VERSION, queryFtSlaveLogDir(), nullptr, wuid, &localFtSlaveHandle));
if (!socket)
throwError1(DFTERR_FailedStartSlave, url.str());

Expand Down Expand Up @@ -1962,6 +1962,13 @@ void CRemotePartitioner::callRemote()

if (sem)
sem->signal();

if (localFtSlaveHandle)
{
DWORD runcode;
if (!wait_program_timeout(localFtSlaveHandle, runcode, 5000))
WARNLOG("CRemotePartitioner::callRemote - Timed out waiting for local FtSlave process to exit");
}
}


Expand Down
Loading

0 comments on commit b6c7273

Please sign in to comment.