Skip to content

Commit

Permalink
HPCC-32072 Thor worker cleaner shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: M Kelly <[email protected]>
  • Loading branch information
mckellyln committed Jul 29, 2024
1 parent eb86455 commit 9371db4
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 11 deletions.
16 changes: 11 additions & 5 deletions system/mp/mpcomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ class CMPServer: private CMPChannelHT, implements IMPServer
bool tryReopenChannel = false;
bool useTLS = false;
unsigned mpTraceLevel = 0;
bool dumpQueue = true;

// packet handlers
PingPacketHandler *pingpackethandler; // TAG_SYS_PING
Expand Down Expand Up @@ -2718,10 +2719,13 @@ CMPServer::CMPServer(unsigned __int64 _role, unsigned _port, bool _listen)
CMPServer::~CMPServer()
{
#ifdef _TRACEORPHANS
StringBuffer buf;
getReceiveQueueDetails(buf);
if (buf.length())
LOG(MCdebugInfo, "MP: Orphan check\n%s",buf.str());
if (dumpQueue)
{
StringBuffer buf;
getReceiveQueueDetails(buf);
if (buf.length())
LOG(MCdebugInfo, "MP: Orphan check\n%s",buf.str());
}
#endif
_releaseAll();
selecthandler->stop(true);
Expand Down Expand Up @@ -3619,6 +3623,7 @@ class CGlobalMPServer : public CMPServer
unsigned queryNest() { return nestLevel; }
bool isPaused() const { return paused; }
void setPaused(bool onOff) { paused = onOff; }
void setDumpQueue(bool onOff) { dumpQueue = onOff; }
};
CriticalSection CGlobalMPServer::sect;
static CGlobalMPServer *globalMPServer;
Expand Down Expand Up @@ -3663,7 +3668,7 @@ void startMPServer(unsigned port, bool paused, bool listen)
startMPServer(0, port, paused, listen);
}

void stopMPServer()
void stopMPServer(bool dumpQueue)
{
CGlobalMPServer *_globalMPServer = NULL;
{
Expand All @@ -3682,6 +3687,7 @@ void stopMPServer()
}
if (NULL == _globalMPServer)
return;
_globalMPServer->setDumpQueue(dumpQueue);
_globalMPServer->stop();
_globalMPServer->Release();
#ifdef _TRACE
Expand Down
2 changes: 1 addition & 1 deletion system/mp/mpcomm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ interface IMPServer : extends IInterface

extern mp_decl void startMPServer(unsigned port, bool paused=false, bool listen=false);
extern mp_decl void startMPServer(unsigned __int64 role, unsigned port, bool paused=false, bool listen=false);
extern mp_decl void stopMPServer();
extern mp_decl void stopMPServer(bool dumpQueue=true);
extern mp_decl IMPServer *getMPServer();
extern mp_decl IMPServer *startNewMPServer(unsigned port, bool listen=false);

Expand Down
6 changes: 5 additions & 1 deletion thorlcr/slave/slavmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
#include "rtlcommon.hpp"
#include "../activities/keyedjoin/thkeyedjoincommon.hpp"

bool recvShutdown = false;

//---------------------------------------------------------------------------

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -1497,7 +1499,8 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
}
catch (IMP_Exception *e)
{
EXCLOG(e, nullptr);
if (!recvShutdown)
EXCLOG(e, nullptr);
e->Release();
break;
}
Expand Down Expand Up @@ -2094,6 +2097,7 @@ class CJobListener : public CSimpleInterface
case Shutdown:
{
stopped = true;
recvShutdown = true;
PROGLOG("Shutdown received");
if (watchdog)
watchdog->stop();
Expand Down
1 change: 1 addition & 0 deletions thorlcr/slave/slavmain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void slaveMain(bool &jobListenerStopped, ILogMsgHandler *logHandler);
void enableThorSlaveAsDaliClient();
void disableThorSlaveAsDaliClient();

extern bool recvShutdown;
#endif


45 changes: 41 additions & 4 deletions thorlcr/slave/thslavemain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
#include "dafdesc.hpp"
#include "rmtfile.hpp"

#include "slavmain.hpp"

#ifdef _CONTAINERIZED
#include "dafsserver.hpp"
#endif
Expand Down Expand Up @@ -285,19 +283,58 @@ bool UnregisterSelf(IException *e)
return false;
}

void signalInFuture(int signo, unsigned timeoutSec)
{
#ifndef _WIN32
int ret;
timer_t timerId;
struct sigevent sigev;
struct itimerspec itSpec;

sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = signo;
sigev.sigev_value.sival_ptr = &timerId;
sigev.sigev_notify_function = nullptr;
sigev.sigev_notify_attributes = NULL;

itSpec.it_value.tv_sec = timeoutSec;
itSpec.it_value.tv_nsec = 0;
itSpec.it_interval.tv_sec = 0;
itSpec.it_interval.tv_nsec = 0;

ret = timer_create(CLOCK_MONOTONIC, &sigev, &timerId);
if (!ret)
timer_settime(timerId, 0, &itSpec, 0);
#endif
}

bool ControlHandler(ahType type)
{
static bool recvdSig = false;
if (recvdSig)
{
if (ahInterrupt == type)
_exit(128+SIGINT);
else
_exit(128+SIGTERM);
}
recvdSig = true;
signalInFuture(SIGTERM, 20);

if (ahInterrupt == type)
LOG(MCdebugProgress, "CTRL-C detected");
else if (!jobListenerStopped)
LOG(MCdebugProgress, "SIGTERM detected");

bool unregOK = false;
if (!jobListenerStopped)
{
if (masterNode)
unregOK = UnregisterSelf(NULL);
abortSlave();
}
if (recvShutdown)
return false;
return !unregOK;
}

Expand Down Expand Up @@ -599,7 +636,7 @@ int main( int argc, const char *argv[] )
setMultiThorMemoryNotify(0,NULL);
roxiemem::releaseRoxieHeap();

if (unregisterException.get())
if (!recvShutdown && unregisterException.get())
UnregisterSelf(unregisterException);

if (getExpertOptBool("slaveDaliClient"))
Expand All @@ -608,7 +645,7 @@ int main( int argc, const char *argv[] )
#ifdef USE_MP_LOG
stopLogMsgReceivers();
#endif
stopMPServer();
stopMPServer(!recvShutdown);
releaseAtoms(); // don't know why we can't use a module_exit to destruct these...

ExitModuleObjects(); // not necessary, atexit will call, but good for leak checking
Expand Down

0 comments on commit 9371db4

Please sign in to comment.