Skip to content

Commit

Permalink
HPCC-30534 Prevent spurious workunit failed states
Browse files Browse the repository at this point in the history
The thor agent (managing the instance queue), was spuriously
setting workunits to failed, if the Thor instance died, which
included when the Thor instance span down when idle (linger).
The thoragent manages the instances that jobs target, that
in a default configuration (multiLingerJob) will spin up
when a wuid needs an instance, and spin down when idle.
If whilst and instance was being recycles it threw a k8s
exception (e.g. 'Job has reached the specified backoff limit'),
it would spuriously cause the original workunit that span up the
instance to be marked failed.
The workunit should only be updated at this point, if it is
still marked as having a running state.
Normally the workunit workflow instance should manage the final state.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Jul 31, 2024
1 parent 99692c1 commit 5f066c5
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions ecl/agentexec/agentexec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class WaitThread : public CInterfaceOf<IPooledThread>
virtual void threadmain() override
{
Owned<IException> exception;
bool sharedK8sJob = false;
try
{
StringAttr jobSpecName(apptype);
Expand All @@ -276,6 +277,7 @@ class WaitThread : public CInterfaceOf<IPooledThread>
bool useChildProcesses = compConfig->getPropBool("@useChildProcesses");
if (isContainerized() && !useChildProcesses)
{
sharedK8sJob = true;
constexpr unsigned queueWaitingTimeoutMs = 10000;
constexpr unsigned queueWaitingCheckPeriodMs = 1000;
if (!owner.lingerQueue || !queueJobIfQueueWaiting(owner.lingerQueue, item, queueWaitingCheckPeriodMs, queueWaitingCheckPeriodMs))
Expand Down Expand Up @@ -337,13 +339,26 @@ class WaitThread : public CInterfaceOf<IPooledThread>
{
EXCLOG(exception);
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
if (workunit)
Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
if (cw)
{
workunit->setState(WUStateFailed);
StringBuffer eStr;
addExceptionToWorkunit(workunit, SeverityError, "agentexec", exception->errorCode(), exception->errorMessage(eStr).str(), nullptr, 0, 0, 0);
workunit->commit();
// if either a) NOT a thoragent with useChildProcesses=false (default in k8s config) or b) is still in an executing state
if (!sharedK8sJob || (cw->getState() == WUStateRunning) || (cw->getState() == WUStateBlocked) || (cw->getState() == WUStateWait))
{
// For a shared k8s job, i.e. where this agent is thoragent launching shared (multiJobLinger) k8s jobs
// the job agent should handle the job state.
// In that scenario, this is a fallback that should only come into effect if the job workflow instance has failed to handle the exception
// e.g. because it abruptly disappeared.
Owned<IWorkUnit> workunit = &cw->lock();
// recheck now locked
if ((workunit->getState() == WUStateRunning) || (workunit->getState() == WUStateBlocked) || (workunit->getState() == WUStateWait))
{
workunit->setState(WUStateFailed);
StringBuffer eStr;
addExceptionToWorkunit(workunit, SeverityError, "agentexec", exception->errorCode(), exception->errorMessage(eStr).str(), nullptr, 0, 0, 0);
workunit->commit();
}
}
}
}
}
Expand Down

0 comments on commit 5f066c5

Please sign in to comment.