Skip to content

Commit

Permalink
HPCC-31009 Ensure Thor startup errors are properly reported.
Browse files Browse the repository at this point in the history
This also closes a few paths in k8s where an early error,
e.g. an error during worker registration, would not be
correctly reported back to the workunit, and cause the
agent to be unaware of the failure and deadlock.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Dec 19, 2023
1 parent c3e6871 commit a4f279c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 80 deletions.
6 changes: 3 additions & 3 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14269,16 +14269,16 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
unsigned runningTimeLimit = workunit.getDebugValueInt("maxRunTime", 0);
runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE;

std::list<WUState> expectedStates = { WUStateRunning, WUStateWait };
std::list<WUState> expectedStates = { WUStateRunning, WUStateWait, WUStateFailed };
unsigned __int64 blockedTime = 0;
for (unsigned i=0; i<2; i++)
{
WUState state = waitForWorkUnitToComplete(wuid, timelimit*1000, expectedStates);
DBGLOG("Got state: %s", getWorkunitStateStr(state));
if (WUStateWait == state) // already finished
if ((WUStateWait == state) || (WUStateFailed == state)) // already finished or failed
{
// workunit may have spent time in blocked state, but then transitioned to
// wait state quickly such that this code did not see its running state.
// wait or failed state quickly such that this code did not see its running state.
if (!blockedTime)
blockedTime = elapsedTimer.elapsedNs();
break;
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void relayWuidException(IConstWorkUnit *workunit, const IException *exception)
if (WUStateWait != state)
{
Owned<IWUException> we = wu->createException();
we->setSeverity(SeverityInformation);
we->setSeverity(SeverityError);
StringBuffer errStr;
exception->errorMessage(errStr);
we->setExceptionMessage(errStr);
Expand Down
134 changes: 58 additions & 76 deletions thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class CRegistryServer : public CSimpleInterface
watchdog->addSlave(ep);
++slavesRegistered;
}
bool connect(unsigned slaves)
void connect(unsigned slaves)
{
IPointerArrayOf<INode> connectedSlaves;
connectedSlaves.ensureCapacity(slaves);
Expand All @@ -287,10 +287,7 @@ class CRegistryServer : public CSimpleInterface
{
::Release(_sender);
if (registerTM.timedout())
{
WARNLOG("Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins);
return false;
}
throw makeStringExceptionV(TE_AbortException, "Timeout waiting for all workers to register within timeout period (%u mins)", maxRegistrationMins);

if (isContainerized())
{
Expand All @@ -307,8 +304,7 @@ class CRegistryServer : public CSimpleInterface
if (NotFound != connectedSlaves.find(sender))
{
StringBuffer epStr;
PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str());
return false;
throw makeStringExceptionV(TE_AbortException, "Same slave registered twice!! : %s", sender->endpoint().getEndpointHostText(epStr).str());
}

/* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration.
Expand Down Expand Up @@ -395,13 +391,11 @@ class CRegistryServer : public CSimpleInterface
msg.append(masterSlaveMpTag);
msg.append(kjServiceMpTag);
if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
{
PROGLOG("Failed to initialize slaves");
return false;
}
throw makeStringException(TE_AbortException, "Failed to initialize slaves");

// Wait for confirmation from slaves
PROGLOG("Initialization sent to slave group");
Owned<IException> exception;
try
{
while (slavesRegistered < slaves)
Expand Down Expand Up @@ -429,11 +423,7 @@ class CRegistryServer : public CSimpleInterface
{
Owned<IException> e = deserializeException(msg);
EXCLOG(e, "Registration error");
if (TE_FailedToRegisterSlave == e->errorCode())
{
setExitCode(0); // to avoid thor auto-recycling
return false;
}
throw e.getClear();
}
registerNode(sender-1);
}
Expand All @@ -445,23 +435,21 @@ class CRegistryServer : public CSimpleInterface
{
CMessageBuffer msg;
if (!queryNodeComm().send(msg, s+1, MPTAG_THORREGISTRATION))
{
PROGLOG("Failed to acknowledge slave %d registration", s+1);
return false;
}
throw makeStringExceptionV(TE_AbortException, "Failed to acknowledge slave %d registration", s+1);
}
if (watchdog)
watchdog->start();
deregistrationWatch.start();
return true;
return;
}
catch (IException *e)
{
EXCLOG(e, "Slave registration exception");
e->Release();
exception.setown(e);
}
shutdown();
return false;
if (exception)
throw exception.getClear();
}
void stop()
{
Expand Down Expand Up @@ -999,7 +987,6 @@ int main( int argc, const char *argv[] )
kjServiceMpTag = allocateClusterMPTag();

unsigned numWorkers = 0;
bool doWorkerRegistration = false;
if (isContainerized())
{
saveWuidToFile(workunit);
Expand Down Expand Up @@ -1044,14 +1031,11 @@ int main( int argc, const char *argv[] )
StringBuffer myEp;
getRemoteAccessibleHostText(myEp, queryMyNode()->endpoint());

workerNSInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true);
if (workerNSInstalled)
{
k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs"));
workerJobInstalled = k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob);
if (workerJobInstalled)
doWorkerRegistration = true;
}
if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "networkpolicy", { }, false, true))
throw makeStringException(TE_AbortException, "Failed to apply worker networkpolicy manifest");
k8s::KeepJobs keepJob = k8s::translateKeepJobs(globals->queryProp("@keepJobs"));
if (!k8s::applyYaml("thorworker", workunit, cloudJobName, "job", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false, k8s::KeepJobs::none == keepJob))
throw makeStringException(TE_AbortException, "Failed to apply worker job manifest");
}
else
{
Expand All @@ -1064,7 +1048,6 @@ int main( int argc, const char *argv[] )
unsigned numWorkersPerNode = globals->getPropInt("@slavesPerNode", 1);
setClusterGroup(queryMyNode(), rawGroup, numWorkersPerNode, channelsPerWorker, slaveBasePort, localThorPortInc);
numWorkers = queryNodeClusterWidth();
doWorkerRegistration = true;
if (numWorkersPerNode > 1)
{
// Split memory based on numWorkersPerNode
Expand All @@ -1075,41 +1058,44 @@ int main( int argc, const char *argv[] )
}
}

if (doWorkerRegistration && registry->connect(numWorkers))
registry->connect(numWorkers);
if (!isContainerized())
{
// bare-metal - check health of dafilesrv's on the Thor cluster.
if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
{
FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
}
}

unsigned totSlaveProcs = queryNodeClusterWidth();
for (unsigned s=0; s<totSlaveProcs; s++)
unsigned totSlaveProcs = queryNodeClusterWidth();
for (unsigned s=0; s<totSlaveProcs; s++)
{
StringBuffer slaveStr;
for (unsigned c=0; c<channelsPerWorker; c++)
{
StringBuffer slaveStr;
for (unsigned c=0; c<channelsPerWorker; c++)
{
unsigned o = s + (c * totSlaveProcs);
if (c)
slaveStr.append(",");
slaveStr.append(o+1);
}
StringBuffer virtStr;
if (channelsPerWorker>1)
virtStr.append("virtual slaves:");
else
virtStr.append("slave:");
PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
unsigned o = s + (c * totSlaveProcs);
if (c)
slaveStr.append(",");
slaveStr.append(o+1);
}
StringBuffer virtStr;
if (channelsPerWorker>1)
virtStr.append("virtual slaves:");
else
virtStr.append("slave:");
PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
}

PROGLOG("verifying mp connection to rest of cluster");
if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60))
throwStringExceptionV(0, "Failed to connect to all nodes");
PROGLOG("verified mp connection to rest of cluster");
PROGLOG("verifying mp connection to rest of cluster");
if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60))
throwStringExceptionV(0, "Failed to connect to all nodes");
PROGLOG("verified mp connection to rest of cluster");

#ifdef _CONTAINERIZED
if (globals->getPropBool("@_dafsStorage"))
{
if (globals->getPropBool("@_dafsStorage"))
{
/* NB: This option is a developer option only.
* It is intended to be used to bring up a temporary Thor instance that uses local node storage,
Expand All @@ -1128,32 +1114,28 @@ int main( int argc, const char *argv[] )
* NB: This isn't a real StoragePlane, and it will not be accessible by any other component.
*
*/
StringBuffer uniqueGrpName;
queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName);
// change default plane
getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName);
PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str());
}
StringBuffer uniqueGrpName;
queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName);
// change default plane
getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName);
PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str());
}
#endif
LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
auditStartLogged = true;
LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
auditStartLogged = true;

writeSentinelFile(sentinelFile);
writeSentinelFile(sentinelFile);

#ifndef _CONTAINERIZED
unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
if (pinterval)
startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
if (pinterval)
startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
#endif
configurePreferredPlanes();
configurePreferredPlanes();

// NB: workunit/graphName only set in one-shot mode (if isCloud())
thorMain(logHandler, workunit, graphName);
LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
}
else
PROGLOG("Registration aborted");
registry.clear();
// NB: workunit/graphName only set in one-shot mode (if isCloud())
thorMain(logHandler, workunit, graphName);
LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
}
catch (IException *e)
Expand Down

0 comments on commit a4f279c

Please sign in to comment.