Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32072 Thor worker cleaner shutdown #18771

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions system/jlib/jexcept.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <sys/wait.h>
#include <sys/types.h>
#include <stddef.h>
#include <time.h>
#include <signal.h>
#include <errno.h>
#ifdef __linux__
#include <execinfo.h> // comment out if not present
Expand Down Expand Up @@ -1663,6 +1665,30 @@ void jlib_decl disableSEHtoExceptionMapping()
#endif
}

void jlib_decl raiseSignalInFuture(int signo, unsigned timeoutSec)
{
#if defined(__linux__)
int ret;
timer_t timerId;
struct sigevent sigev;
struct itimerspec itSpec;

sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = signo;
sigev.sigev_value.sival_ptr = &timerId;
sigev.sigev_notify_function = nullptr;
sigev.sigev_notify_attributes = NULL;

itSpec.it_value.tv_sec = timeoutSec;
itSpec.it_value.tv_nsec = 0;
itSpec.it_interval.tv_sec = 0;
itSpec.it_interval.tv_nsec = 0;

ret = timer_create(CLOCK_MONOTONIC, &sigev, &timerId);
if (!ret)
timer_settime(timerId, 0, &itSpec, 0);
#endif
}

StringBuffer & formatSystemError(StringBuffer & out, unsigned errcode)
{
Expand Down
1 change: 1 addition & 0 deletions system/jlib/jexcept.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ void jlib_decl disableSEHtoExceptionMapping();

void jlib_decl *setSEHtoExceptionHandler(IExceptionHandler *handler); // sets handler and return old value

void jlib_decl raiseSignalInFuture(int signo, unsigned timeoutSec);

void jlib_decl setTerminateOnSEHInSystemDLLs(bool set=true);
void jlib_decl setTerminateOnSEH(bool set=true);
Expand Down
16 changes: 11 additions & 5 deletions system/mp/mpcomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ class CMPServer: private CMPChannelHT, implements IMPServer
bool tryReopenChannel = false;
bool useTLS = false;
unsigned mpTraceLevel = 0;
bool dumpQueue = true;

// packet handlers
PingPacketHandler *pingpackethandler; // TAG_SYS_PING
Expand Down Expand Up @@ -2718,10 +2719,13 @@ CMPServer::CMPServer(unsigned __int64 _role, unsigned _port, bool _listen)
CMPServer::~CMPServer()
{
#ifdef _TRACEORPHANS
StringBuffer buf;
getReceiveQueueDetails(buf);
if (buf.length())
LOG(MCdebugInfo, "MP: Orphan check\n%s",buf.str());
if (dumpQueue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really an incidental change? Why suppress dumping of unconsumed MP messages if slave has been told to shutdown by manager, vs if it hasn't?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On clean shutdown I didn't want the MP Orphan check dump in the logs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I don't think message are useful in general.

{
StringBuffer buf;
getReceiveQueueDetails(buf);
if (buf.length())
LOG(MCdebugInfo, "MP: Orphan check\n%s",buf.str());
}
#endif
_releaseAll();
selecthandler->stop(true);
Expand Down Expand Up @@ -3619,6 +3623,7 @@ class CGlobalMPServer : public CMPServer
unsigned queryNest() { return nestLevel; }
bool isPaused() const { return paused; }
void setPaused(bool onOff) { paused = onOff; }
void setDumpQueue(bool onOff) { dumpQueue = onOff; }
};
CriticalSection CGlobalMPServer::sect;
static CGlobalMPServer *globalMPServer;
Expand Down Expand Up @@ -3663,7 +3668,7 @@ void startMPServer(unsigned port, bool paused, bool listen)
startMPServer(0, port, paused, listen);
}

void stopMPServer()
void stopMPServer(bool dumpQueue)
{
CGlobalMPServer *_globalMPServer = NULL;
{
Expand All @@ -3682,6 +3687,7 @@ void stopMPServer()
}
if (NULL == _globalMPServer)
return;
_globalMPServer->setDumpQueue(dumpQueue);
_globalMPServer->stop();
_globalMPServer->Release();
#ifdef _TRACE
Expand Down
2 changes: 1 addition & 1 deletion system/mp/mpcomm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ interface IMPServer : extends IInterface

extern mp_decl void startMPServer(unsigned port, bool paused=false, bool listen=false);
extern mp_decl void startMPServer(unsigned __int64 role, unsigned port, bool paused=false, bool listen=false);
extern mp_decl void stopMPServer();
extern mp_decl void stopMPServer(bool dumpQueue=true);
extern mp_decl IMPServer *getMPServer();
extern mp_decl IMPServer *startNewMPServer(unsigned port, bool listen=false);

Expand Down
6 changes: 5 additions & 1 deletion thorlcr/slave/slavmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
#include "rtlcommon.hpp"
#include "../activities/keyedjoin/thkeyedjoincommon.hpp"

bool recvShutdown = false;

//---------------------------------------------------------------------------

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -1497,7 +1499,8 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
}
catch (IMP_Exception *e)
{
EXCLOG(e, nullptr);
if (!recvShutdown)
EXCLOG(e, nullptr);
e->Release();
break;
}
Expand Down Expand Up @@ -2094,6 +2097,7 @@ class CJobListener : public CSimpleInterface
case Shutdown:
{
stopped = true;
recvShutdown = true;
PROGLOG("Shutdown received");
if (watchdog)
watchdog->stop();
Expand Down
1 change: 1 addition & 0 deletions thorlcr/slave/slavmain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void slaveMain(bool &jobListenerStopped, ILogMsgHandler *logHandler);
void enableThorSlaveAsDaliClient();
void disableThorSlaveAsDaliClient();

extern bool recvShutdown;
#endif


20 changes: 16 additions & 4 deletions thorlcr/slave/thslavemain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
#include "dafdesc.hpp"
#include "rmtfile.hpp"

#include "slavmain.hpp"

#ifdef _CONTAINERIZED
#include "dafsserver.hpp"
#endif
Expand Down Expand Up @@ -287,17 +285,31 @@ bool UnregisterSelf(IException *e)

bool ControlHandler(ahType type)
{
static bool recvdSig = false;
if (recvdSig)
{
if (ahInterrupt == type)
_exit(128+SIGINT);
else
_exit(128+SIGTERM);
jakesmith marked this conversation as resolved.
Show resolved Hide resolved
}
recvdSig = true;
raiseSignalInFuture(SIGTERM, 20);

if (ahInterrupt == type)
LOG(MCdebugProgress, "CTRL-C detected");
else if (!jobListenerStopped)
LOG(MCdebugProgress, "SIGTERM detected");

bool unregOK = false;
if (!jobListenerStopped)
{
if (masterNode)
unregOK = UnregisterSelf(NULL);
abortSlave();
}
if (recvShutdown)
return false;
return !unregOK;
}

Expand Down Expand Up @@ -599,7 +611,7 @@ int main( int argc, const char *argv[] )
setMultiThorMemoryNotify(0,NULL);
roxiemem::releaseRoxieHeap();

if (unregisterException.get())
if (!recvShutdown && unregisterException.get())
UnregisterSelf(unregisterException);

if (getExpertOptBool("slaveDaliClient"))
Expand All @@ -608,7 +620,7 @@ int main( int argc, const char *argv[] )
#ifdef USE_MP_LOG
stopLogMsgReceivers();
#endif
stopMPServer();
stopMPServer(!recvShutdown);
releaseAtoms(); // don't know why we can't use a module_exit to destruct these...

ExitModuleObjects(); // not necessary, atexit will call, but good for leak checking
Expand Down
Loading