Skip to content

Commit

Permalink
Merge pull request #17825 from jakesmith/HPCC-30366-remove-unused-CMa…
Browse files Browse the repository at this point in the history
…sterWatchdogUDP

HPCC-30366 Remove unused CMasterWatchdogUDP

Reviewed-By: Shamser Ahmed <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Sep 27, 2023
2 parents b23a71b + 7393d7c commit 69d8686
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 96 deletions.
114 changes: 27 additions & 87 deletions thorlcr/master/mawatchdog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,18 @@ class CMachineStatus
};


CMasterWatchdogBase::CMasterWatchdogBase() : threaded("CMasterWatchdogBase")
CMasterWatchdog::CMasterWatchdog(bool startNow) : threaded("CMasterWatchdogBase")
{
stopped = true;
watchdogMachineTimeout = globals->getPropInt("@slaveDownTimeout", DEFAULT_SLAVEDOWNTIMEOUT);
if (watchdogMachineTimeout <= HEARTBEAT_INTERVAL*10)
watchdogMachineTimeout = HEARTBEAT_INTERVAL*10;
watchdogMachineTimeout *= 1000;
if (startNow)
start();
}

CMasterWatchdogBase::~CMasterWatchdogBase()
CMasterWatchdog::~CMasterWatchdog()
{
stop();
ForEachItemInRev(i, state)
Expand All @@ -80,7 +82,7 @@ CMasterWatchdogBase::~CMasterWatchdogBase()
}
}

void CMasterWatchdogBase::start()
void CMasterWatchdog::start()
{
if (stopped)
{
Expand All @@ -93,14 +95,14 @@ void CMasterWatchdogBase::start()
}
}

void CMasterWatchdogBase::addSlave(const SocketEndpoint &slave)
void CMasterWatchdog::addSlave(const SocketEndpoint &slave)
{
synchronized block(mutex);
CMachineStatus *mstate=new CMachineStatus(slave);
state.append(mstate);
}

void CMasterWatchdogBase::removeSlave(const SocketEndpoint &slave)
void CMasterWatchdog::removeSlave(const SocketEndpoint &slave)
{
synchronized block(mutex);
CMachineStatus *ms = findSlave(slave);
Expand All @@ -110,7 +112,7 @@ void CMasterWatchdogBase::removeSlave(const SocketEndpoint &slave)
}
}

CMachineStatus *CMasterWatchdogBase::findSlave(const SocketEndpoint &ep)
CMachineStatus *CMasterWatchdog::findSlave(const SocketEndpoint &ep)
{
ForEachItemInRev(i, state)
{
Expand All @@ -122,7 +124,7 @@ CMachineStatus *CMasterWatchdogBase::findSlave(const SocketEndpoint &ep)
}


void CMasterWatchdogBase::stop()
void CMasterWatchdog::stop()
{
{
synchronized block(mutex);
Expand All @@ -140,7 +142,7 @@ void CMasterWatchdogBase::stop()
LOG(MCdebugProgress, thorJob, "Stopped watchdog");
}

void CMasterWatchdogBase::checkMachineStatus()
void CMasterWatchdog::checkMachineStatus()
{
synchronized block(mutex);
ForEachItemInRev(i, state)
Expand All @@ -165,7 +167,7 @@ void CMasterWatchdogBase::checkMachineStatus()
}
}

unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb)
unsigned CMasterWatchdog::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb)
{
mb.clear();
unsigned read = readData(mb);
Expand All @@ -185,7 +187,20 @@ unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer
return 0;
}

void CMasterWatchdogBase::threadmain()
unsigned CMasterWatchdog::readData(MemoryBuffer &mb)
{
CMessageBuffer msg;
if (!queryNodeComm().recv(msg, RANK_ALL, MPTAG_THORWATCHDOG, NULL, watchdogMachineTimeout))
return 0;
mb.swapWith(msg);
return mb.length();
}
void CMasterWatchdog::stopReading()
{
queryNodeComm().cancel(0, MPTAG_THORWATCHDOG);
}

void CMasterWatchdog::threadmain()
{
LOG(MCdebugProgress, thorJob, "Started watchdog");
unsigned lastbeat=msTick();
Expand Down Expand Up @@ -258,84 +273,9 @@ void CMasterWatchdogBase::threadmain()
}
}


class CMasterWatchdogUDP : public CMasterWatchdogBase
{
ISocket *sock;
public:
CMasterWatchdogUDP(bool startNow)
{
sock = ISocket::udp_create(getFixedPort(TPORT_watchdog));
if (startNow)
start();
}
~CMasterWatchdogUDP()
{
::Release(sock);
}
virtual unsigned readData(MemoryBuffer &mb)
{
size32_t read;
try
{
sock->readtms(mb.reserveTruncate(UDP_DATA_MAX), sizeof(HeartBeatPacketHeader), UDP_DATA_MAX, read, watchdogMachineTimeout);
}
catch (IJSOCK_Exception *e)
{
if ((e->errorCode()!=JSOCKERR_timeout_expired)&&(e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
throw;
e->Release();
return 0; // will retry
}
return read;
}
virtual void stopReading()
{
if (sock)
{
SocketEndpoint masterEp(getMasterPortBase());
StringBuffer ipStr;
masterEp.getHostText(ipStr);
Owned<ISocket> sock = ISocket::udp_connect(getFixedPort(masterEp.port, TPORT_watchdog), ipStr.str());
// send empty packet, stopped set, will cease reading
HeartBeatPacketHeader hb;
hb.packetSize = sizeof(HeartBeatPacketHeader);
sock->write(&hb, sizeof(HeartBeatPacketHeader));
sock->close();
}
}
};

/////////////////////

class CMasterWatchdogMP : public CMasterWatchdogBase
{
public:
CMasterWatchdogMP(bool startNow)
{
if (startNow)
start();
}
virtual unsigned readData(MemoryBuffer &mb)
{
CMessageBuffer msg;
if (!queryNodeComm().recv(msg, RANK_ALL, MPTAG_THORWATCHDOG, NULL, watchdogMachineTimeout))
return 0;
mb.swapWith(msg);
return mb.length();
}
virtual void stopReading()
{
queryNodeComm().cancel(0, MPTAG_THORWATCHDOG);
}
};

/////////////////////

CMasterWatchdogBase *createMasterWatchdog(bool udp, bool startNow)
CMasterWatchdog *createMasterWatchdog(bool startNow)
{
if (udp)
return new CMasterWatchdogUDP(startNow);
else
return new CMasterWatchdogMP(startNow);
return new CMasterWatchdog(startNow);
}
14 changes: 7 additions & 7 deletions thorlcr/master/mawatchdog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
class CMachineStatus;
struct HeartBeatPacketHeader;

class CMasterWatchdogBase : public CSimpleInterface, implements IThreaded
class CMasterWatchdog : public CSimpleInterface, implements IThreaded
{
PointerArray state;
SocketEndpoint master;
Expand All @@ -37,22 +37,22 @@ class CMasterWatchdogBase : public CSimpleInterface, implements IThreaded
bool stopped;
unsigned watchdogMachineTimeout;
public:
CMasterWatchdogBase();
~CMasterWatchdogBase();
CMasterWatchdog(bool startNow);
~CMasterWatchdog();
void addSlave(const SocketEndpoint &slave);
void removeSlave(const SocketEndpoint &slave);
CMachineStatus *findSlave(const SocketEndpoint &ep);
void checkMachineStatus();
unsigned readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb);
void start();
void stop();
unsigned readData(MemoryBuffer &mb);
void stopReading();
// IThredaed
virtual void threadmain() override;

virtual unsigned readData(MemoryBuffer &mb) = 0;
virtual void stopReading() = 0;
};

CMasterWatchdogBase *createMasterWatchdog(bool udp=false, bool startNow=false);
CMasterWatchdog *createMasterWatchdog(bool startNow=false);

#endif

4 changes: 2 additions & 2 deletions thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class CRegistryServer : public CSimpleInterface
}
} deregistrationWatch;
public:
Linked<CMasterWatchdogBase> watchdog;
Linked<CMasterWatchdog> watchdog;
IBitSet *status;

CRegistryServer() : deregistrationWatch(*this)
Expand All @@ -209,7 +209,7 @@ class CRegistryServer : public CSimpleInterface
msgDelay = SLAVEREG_VERIFY_DELAY;
slavesRegistered = 0;
if (globals->getPropBool("@watchdogEnabled"))
watchdog.setown(createMasterWatchdog(globals->getPropBool("@useUDPWatchdog")));
watchdog.setown(createMasterWatchdog());
else
globals->setPropBool("@watchdogProgressEnabled", false);
CriticalBlock b(regCrit);
Expand Down

0 comments on commit 69d8686

Please sign in to comment.