Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-33016 Clearup lingerPeriod conditional code #19335

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 34 additions & 37 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1441,9 +1441,10 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
else
{
unsigned lingerPeriod = globals->getPropInt("@lingerPeriod", defaultThorLingerPeriod)*1000;
dbgassertex(lingerPeriod>=1000); // NB: the schema or the default ensure the linger period is non-zero
bool multiJobLinger = globals->getPropBool("@multiJobLinger", defaultThorMultiJobLinger);
VStringBuffer multiJobLingerQueueName("%s_lingerqueue", globals->queryProp("@name"));
StringBuffer instance("thorinstance_"); // only used when multiJobLinger = false (and lingerPeriod>0)
StringBuffer instance("thorinstance_"); // only used when multiJobLinger = false

if (multiJobLinger)
{
Expand All @@ -1458,7 +1459,7 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
thorQueue->connect(false);
}

if (!multiJobLinger && lingerPeriod)
if (!multiJobLinger)
{
// We avoid using getEndpointHostText here and get an IP instead, because the client pod communicating directly with this Thor manager,
// will not have the ability to resolve this pods hostname.
Expand Down Expand Up @@ -1504,7 +1505,7 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
jobManager->execute(workunit, currentWuid, currentGraphName, dummyAgentEp);

Owned<IWorkUnit> w = &workunit->lock();
if (!multiJobLinger && lingerPeriod)
if (!multiJobLinger)
w->setDebugValue(instance, "1", true);

if (jobManager->queryExitException())
Expand Down Expand Up @@ -1532,48 +1533,44 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
}

currentGraphName.clear();
if (lingerPeriod)

PROGLOG("Lingering time left: %.2f", ((float)lingerPeriod)/1000);
StringBuffer nextJob;
CTimeMon timer(lingerPeriod);
unsigned remaining;
while (!timer.timedout(&remaining))
{
PROGLOG("Lingering time left: %.2f", ((float)lingerPeriod)/1000);
StringBuffer nextJob;
CTimeMon timer(lingerPeriod);
unsigned remaining;
while (!timer.timedout(&remaining))
StringBuffer wuid;
int ret = recvNextGraph(remaining, currentWuid.str(), wuid, currentGraphName);
if (ret > 0)
{
StringBuffer wuid;
int ret = recvNextGraph(remaining, currentWuid.str(), wuid, currentGraphName);
if (ret > 0)
{
currentWuid.set(wuid); // NB: will always be same if !multiJobLinger
break; // success
}
else if (ret < 0)
break; // timeout/abort
// else - reject/ignore duff message.
currentWuid.set(wuid); // NB: will always be same if !multiJobLinger
break; // success
}
else if (ret < 0)
break; // timeout/abort
// else - reject/ignore duff message.
}

// The following is true if no workunit/graph have been received
// MORE: I think it should also be executed if lingerPeriod is 0
if (0 == currentGraphName.length())
if (0 == currentGraphName.length())
{
if (!multiJobLinger)
{
if (!multiJobLinger)
// De-register the idle lingering entry.
Owned<IWorkUnitFactory> factory;
Owned<IConstWorkUnit> workunit;
factory.setown(getWorkUnitFactory());
workunit.setown(factory->openWorkUnit(currentWuid));
//Unlikely, but the workunit could have been deleted while we were lingering
//currentWuid can also be blank if the workunit this started for died before thor started
//processing the graph. This test covers both (unlikely) situations.
if (workunit)
{
// De-register the idle lingering entry.
Owned<IWorkUnitFactory> factory;
Owned<IConstWorkUnit> workunit;
factory.setown(getWorkUnitFactory());
workunit.setown(factory->openWorkUnit(currentWuid));
//Unlikely, but the workunit could have been deleted while we were lingering
//currentWuid can also be blank if the workunit this started for died before thor started
//processing the graph. This test covers both (unlikely) situations.
if (workunit)
{
Owned<IWorkUnit> w = &workunit->lock();
w->setDebugValue(instance, "0", true);
}
Owned<IWorkUnit> w = &workunit->lock();
w->setDebugValue(instance, "0", true);
}
break;
}
break;
}
}
thorQueue.clear();
Expand Down
Loading