diff --git a/ecl/agentexec/agentexec.cpp b/ecl/agentexec/agentexec.cpp index 3cb3364ebb4..58ab247d984 100644 --- a/ecl/agentexec/agentexec.cpp +++ b/ecl/agentexec/agentexec.cpp @@ -261,6 +261,7 @@ class WaitThread : public CInterfaceOf virtual void threadmain() override { Owned exception; + bool sharedK8sJob = false; try { StringAttr jobSpecName(apptype); @@ -276,6 +277,7 @@ class WaitThread : public CInterfaceOf bool useChildProcesses = compConfig->getPropBool("@useChildProcesses"); if (isContainerized() && !useChildProcesses) { + sharedK8sJob = true; constexpr unsigned queueWaitingTimeoutMs = 10000; constexpr unsigned queueWaitingCheckPeriodMs = 1000; if (!owner.lingerQueue || !queueJobIfQueueWaiting(owner.lingerQueue, item, queueWaitingCheckPeriodMs, queueWaitingCheckPeriodMs)) @@ -337,13 +339,26 @@ class WaitThread : public CInterfaceOf { EXCLOG(exception); Owned factory = getWorkUnitFactory(); - Owned workunit = factory->updateWorkUnit(wuid); - if (workunit) + Owned cw = factory->openWorkUnit(wuid); + if (cw) { - workunit->setState(WUStateFailed); - StringBuffer eStr; - addExceptionToWorkunit(workunit, SeverityError, "agentexec", exception->errorCode(), exception->errorMessage(eStr).str(), nullptr, 0, 0, 0); - workunit->commit(); + // if either a) NOT a thoragent with useChildProcesses=false (default in k8s config) or b) is still in an executing state + if (!sharedK8sJob || (cw->getState() == WUStateRunning) || (cw->getState() == WUStateBlocked) || (cw->getState() == WUStateWait)) + { + // For a shared k8s job, i.e. where this agent is thoragent launching shared (multiJobLinger) k8s jobs + // the job agent should handle the job state. + // In that scenario, this is a fallback that should only come into effect if the job workflow instance has failed to handle the exception + // e.g. because it abruptly disappeared. + Owned workunit = &cw->lock(); + // recheck now locked + if ((workunit->getState() == WUStateRunning) || (workunit->getState() == WUStateBlocked) || (workunit->getState() == WUStateWait)) + { + workunit->setState(WUStateFailed); + StringBuffer eStr; + addExceptionToWorkunit(workunit, SeverityError, "agentexec", exception->errorCode(), exception->errorMessage(eStr).str(), nullptr, 0, 0, 0); + workunit->commit(); + } + } } } }