Skip to content

Commit

Permalink
Merge pull request #18933 from jakesmith/HPCC-30534-prevent-failed-wuids
Browse files Browse the repository at this point in the history
HPCC-30534 Prevent spurious workunit failed states

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Aug 2, 2024
2 parents a7769d2 + 5f066c5 commit e93dc6a
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions ecl/agentexec/agentexec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class WaitThread : public CInterfaceOf<IPooledThread>
virtual void threadmain() override
{
Owned<IException> exception;
bool sharedK8sJob = false;
try
{
StringAttr jobSpecName(apptype);
Expand All @@ -276,6 +277,7 @@ class WaitThread : public CInterfaceOf<IPooledThread>
bool useChildProcesses = compConfig->getPropBool("@useChildProcesses");
if (isContainerized() && !useChildProcesses)
{
sharedK8sJob = true;
constexpr unsigned queueWaitingTimeoutMs = 10000;
constexpr unsigned queueWaitingCheckPeriodMs = 1000;
if (!owner.lingerQueue || !queueJobIfQueueWaiting(owner.lingerQueue, item, queueWaitingCheckPeriodMs, queueWaitingCheckPeriodMs))
Expand Down Expand Up @@ -337,13 +339,26 @@ class WaitThread : public CInterfaceOf<IPooledThread>
{
EXCLOG(exception);
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
if (workunit)
Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
if (cw)
{
workunit->setState(WUStateFailed);
StringBuffer eStr;
addExceptionToWorkunit(workunit, SeverityError, "agentexec", exception->errorCode(), exception->errorMessage(eStr).str(), nullptr, 0, 0, 0);
workunit->commit();
// if either a) NOT a thoragent with useChildProcesses=false (default in k8s config) or b) is still in an executing state
if (!sharedK8sJob || (cw->getState() == WUStateRunning) || (cw->getState() == WUStateBlocked) || (cw->getState() == WUStateWait))
{
// For a shared k8s job, i.e. where this agent is thoragent launching shared (multiJobLinger) k8s jobs
// the job agent should handle the job state.
// In that scenario, this is a fallback that should only come into effect if the job workflow instance has failed to handle the exception
// e.g. because it abruptly disappeared.
Owned<IWorkUnit> workunit = &cw->lock();
// recheck now locked
if ((workunit->getState() == WUStateRunning) || (workunit->getState() == WUStateBlocked) || (workunit->getState() == WUStateWait))
{
workunit->setState(WUStateFailed);
StringBuffer eStr;
addExceptionToWorkunit(workunit, SeverityError, "agentexec", exception->errorCode(), exception->errorMessage(eStr).str(), nullptr, 0, 0, 0);
workunit->commit();
}
}
}
}
}
Expand Down

0 comments on commit e93dc6a

Please sign in to comment.