diff --git a/thorlcr/master/mawatchdog.cpp b/thorlcr/master/mawatchdog.cpp index 4d7c13e74b8..3290bb5da12 100644 --- a/thorlcr/master/mawatchdog.cpp +++ b/thorlcr/master/mawatchdog.cpp @@ -61,16 +61,18 @@ class CMachineStatus }; -CMasterWatchdogBase::CMasterWatchdogBase() : threaded("CMasterWatchdogBase") +CMasterWatchdog::CMasterWatchdog(bool startNow) : threaded("CMasterWatchdogBase") { stopped = true; watchdogMachineTimeout = globals->getPropInt("@slaveDownTimeout", DEFAULT_SLAVEDOWNTIMEOUT); if (watchdogMachineTimeout <= HEARTBEAT_INTERVAL*10) watchdogMachineTimeout = HEARTBEAT_INTERVAL*10; watchdogMachineTimeout *= 1000; + if (startNow) + start(); } -CMasterWatchdogBase::~CMasterWatchdogBase() +CMasterWatchdog::~CMasterWatchdog() { stop(); ForEachItemInRev(i, state) @@ -80,7 +82,7 @@ CMasterWatchdogBase::~CMasterWatchdogBase() } } -void CMasterWatchdogBase::start() +void CMasterWatchdog::start() { if (stopped) { @@ -93,14 +95,14 @@ void CMasterWatchdogBase::start() } } -void CMasterWatchdogBase::addSlave(const SocketEndpoint &slave) +void CMasterWatchdog::addSlave(const SocketEndpoint &slave) { synchronized block(mutex); CMachineStatus *mstate=new CMachineStatus(slave); state.append(mstate); } -void CMasterWatchdogBase::removeSlave(const SocketEndpoint &slave) +void CMasterWatchdog::removeSlave(const SocketEndpoint &slave) { synchronized block(mutex); CMachineStatus *ms = findSlave(slave); @@ -110,7 +112,7 @@ void CMasterWatchdogBase::removeSlave(const SocketEndpoint &slave) } } -CMachineStatus *CMasterWatchdogBase::findSlave(const SocketEndpoint &ep) +CMachineStatus *CMasterWatchdog::findSlave(const SocketEndpoint &ep) { ForEachItemInRev(i, state) { @@ -122,7 +124,7 @@ CMachineStatus *CMasterWatchdogBase::findSlave(const SocketEndpoint &ep) } -void CMasterWatchdogBase::stop() +void CMasterWatchdog::stop() { { synchronized block(mutex); @@ -140,7 +142,7 @@ void CMasterWatchdogBase::stop() LOG(MCdebugProgress, thorJob, "Stopped watchdog"); } -void CMasterWatchdogBase::checkMachineStatus() +void CMasterWatchdog::checkMachineStatus() { synchronized block(mutex); ForEachItemInRev(i, state) @@ -165,7 +167,7 @@ void CMasterWatchdogBase::checkMachineStatus() } } -unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb) +unsigned CMasterWatchdog::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb) { mb.clear(); unsigned read = readData(mb); @@ -185,7 +187,20 @@ unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer return 0; } -void CMasterWatchdogBase::threadmain() +unsigned CMasterWatchdog::readData(MemoryBuffer &mb) +{ + CMessageBuffer msg; + if (!queryNodeComm().recv(msg, RANK_ALL, MPTAG_THORWATCHDOG, NULL, watchdogMachineTimeout)) + return 0; + mb.swapWith(msg); + return mb.length(); +} +void CMasterWatchdog::stopReading() +{ + queryNodeComm().cancel(0, MPTAG_THORWATCHDOG); +} + +void CMasterWatchdog::threadmain() { LOG(MCdebugProgress, thorJob, "Started watchdog"); unsigned lastbeat=msTick(); @@ -258,84 +273,9 @@ void CMasterWatchdogBase::threadmain() } } - -class CMasterWatchdogUDP : public CMasterWatchdogBase -{ - ISocket *sock; -public: - CMasterWatchdogUDP(bool startNow) - { - sock = ISocket::udp_create(getFixedPort(TPORT_watchdog)); - if (startNow) - start(); - } - ~CMasterWatchdogUDP() - { - ::Release(sock); - } - virtual unsigned readData(MemoryBuffer &mb) - { - size32_t read; - try - { - sock->readtms(mb.reserveTruncate(UDP_DATA_MAX), sizeof(HeartBeatPacketHeader), UDP_DATA_MAX, read, watchdogMachineTimeout); - } - catch (IJSOCK_Exception *e) - { - if ((e->errorCode()!=JSOCKERR_timeout_expired)&&(e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close)) - throw; - e->Release(); - return 0; // will retry - } - return read; - } - virtual void stopReading() - { - if (sock) - { - SocketEndpoint masterEp(getMasterPortBase()); - StringBuffer ipStr; - masterEp.getHostText(ipStr); - Owned sock = ISocket::udp_connect(getFixedPort(masterEp.port, TPORT_watchdog), ipStr.str()); - // send empty packet, stopped set, will cease reading - HeartBeatPacketHeader hb; - hb.packetSize = sizeof(HeartBeatPacketHeader); - sock->write(&hb, sizeof(HeartBeatPacketHeader)); - sock->close(); - } - } -}; - -///////////////////// - -class CMasterWatchdogMP : public CMasterWatchdogBase -{ -public: - CMasterWatchdogMP(bool startNow) - { - if (startNow) - start(); - } - virtual unsigned readData(MemoryBuffer &mb) - { - CMessageBuffer msg; - if (!queryNodeComm().recv(msg, RANK_ALL, MPTAG_THORWATCHDOG, NULL, watchdogMachineTimeout)) - return 0; - mb.swapWith(msg); - return mb.length(); - } - virtual void stopReading() - { - queryNodeComm().cancel(0, MPTAG_THORWATCHDOG); - } -}; - ///////////////////// -CMasterWatchdogBase *createMasterWatchdog(bool udp, bool startNow) +CMasterWatchdog *createMasterWatchdog(bool startNow) { - if (udp) - return new CMasterWatchdogUDP(startNow); - else - return new CMasterWatchdogMP(startNow); + return new CMasterWatchdog(startNow); } diff --git a/thorlcr/master/mawatchdog.hpp b/thorlcr/master/mawatchdog.hpp index 11d3c8d9fad..6383694882f 100644 --- a/thorlcr/master/mawatchdog.hpp +++ b/thorlcr/master/mawatchdog.hpp @@ -26,7 +26,7 @@ class CMachineStatus; struct HeartBeatPacketHeader; -class CMasterWatchdogBase : public CSimpleInterface, implements IThreaded +class CMasterWatchdog : public CSimpleInterface, implements IThreaded { PointerArray state; SocketEndpoint master; @@ -37,8 +37,8 @@ class CMasterWatchdogBase : public CSimpleInterface, implements IThreaded bool stopped; unsigned watchdogMachineTimeout; public: - CMasterWatchdogBase(); - ~CMasterWatchdogBase(); + CMasterWatchdog(bool startNow); + ~CMasterWatchdog(); void addSlave(const SocketEndpoint &slave); void removeSlave(const SocketEndpoint &slave); CMachineStatus *findSlave(const SocketEndpoint &ep); @@ -46,13 +46,13 @@ class CMasterWatchdogBase : public CSimpleInterface, implements IThreaded unsigned readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb); void start(); void stop(); + unsigned readData(MemoryBuffer &mb); + void stopReading(); +// IThredaed virtual void threadmain() override; - - virtual unsigned readData(MemoryBuffer &mb) = 0; - virtual void stopReading() = 0; }; -CMasterWatchdogBase *createMasterWatchdog(bool udp=false, bool startNow=false); +CMasterWatchdog *createMasterWatchdog(bool startNow=false); #endif diff --git a/thorlcr/master/thmastermain.cpp b/thorlcr/master/thmastermain.cpp index f01925ab183..6167d60454a 100644 --- a/thorlcr/master/thmastermain.cpp +++ b/thorlcr/master/thmastermain.cpp @@ -200,7 +200,7 @@ class CRegistryServer : public CSimpleInterface } } deregistrationWatch; public: - Linked watchdog; + Linked watchdog; IBitSet *status; CRegistryServer() : deregistrationWatch(*this) @@ -209,7 +209,7 @@ class CRegistryServer : public CSimpleInterface msgDelay = SLAVEREG_VERIFY_DELAY; slavesRegistered = 0; if (globals->getPropBool("@watchdogEnabled")) - watchdog.setown(createMasterWatchdog(globals->getPropBool("@useUDPWatchdog"))); + watchdog.setown(createMasterWatchdog()); else globals->setPropBool("@watchdogProgressEnabled", false); CriticalBlock b(regCrit);