Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.8.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Oct 18, 2024
2 parents 4317673 + e52b380 commit ea1047a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 36 deletions.
2 changes: 1 addition & 1 deletion system/jlib/jdebug.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class PeriodicTimer

protected:
cycle_t timePeriodCycles = 0;
cycle_t lastElapsedCycles = 0;
std::atomic<cycle_t> lastElapsedCycles{0};
};


Expand Down
20 changes: 10 additions & 10 deletions system/jlib/jfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2055,7 +2055,7 @@ void CFileIO::setSize(offset_t pos)

//-- Unix implementation ----------------------------------------------------

static void doSync(int fd, bool dataOnly)
static void doSync(const CFileIO &fileIO, int fd, bool dataOnly)
{
#ifdef F_FULLFSYNC
// No EIO type retry available
Expand All @@ -2066,25 +2066,25 @@ static void doSync(int fd, bool dataOnly)
if (ret == 0)
{
if (timer.elapsedMs() >= 10000)
IWARNLOG("doSync: slow success: took %u ms", timer.elapsedMs());
IWARNLOG("doSync(%s): slow success: took %u ms", fileIO.querySafeFilename(), timer.elapsedMs());
}
else
{
int err = errno;
printStackReport();
Owned<IException> e = makeErrnoExceptionV(err, "doSync: failed after %u ms", timer.elapsedMs());
Owned<IException> e = makeErrnoExceptionV(err, "doSync(%s): failed after %u ms", fileIO.querySafeFilename(), timer.elapsedMs());
OWARNLOG(e);
throw e.getClear();
}
#endif
}

static void syncFileData(int fd, bool notReadOnly, IFEflags extraFlags, bool wait_previous=false)
static void syncFileData(const CFileIO &fileIO, int fd, bool notReadOnly, IFEflags extraFlags, bool wait_previous=false)
{
if (notReadOnly)
{
if (extraFlags & IFEsync)
doSync(fd, true);
doSync(fileIO, fd, true);
#if defined(__linux__)
else if (extraFlags & IFEnocache)
{
Expand Down Expand Up @@ -2176,9 +2176,9 @@ void CFileIO::close()
DBGLOG("CFileIO::close(%d), extraFlags = %d", tmpHandle, extraFlags);
#endif
if (extraFlags & (IFEnocache | IFEsync))
syncFileData(tmpHandle, openmode!=IFOread, extraFlags, false);
syncFileData(*this, tmpHandle, openmode!=IFOread, extraFlags, false);
else if (extraFlags & IFEsyncAtClose)
doSync(tmpHandle, false);
doSync(*this, tmpHandle, false);

if (::close(tmpHandle) < 0)
throw makeErrnoExceptionV(errno, "CFileIO::close for file '%s'", querySafeFilename());
Expand All @@ -2192,7 +2192,7 @@ void CFileIO::flush()

CriticalBlock procedure(cs);

syncFileData(file, true, extraFlags, false);
syncFileData(*this, file, true, extraFlags, false);
}


Expand Down Expand Up @@ -2229,7 +2229,7 @@ size32_t CFileIO::read(offset_t pos, size32_t len, void * data)
if (unflushedReadBytes.add_fetch(ret) >= PGCFLUSH_BLKSIZE)
{
unflushedReadBytes.store(0);
syncFileData(file, false, extraFlags, false);
syncFileData(*this, file, false, extraFlags, false);
}
}
return ret;
Expand Down Expand Up @@ -2259,7 +2259,7 @@ size32_t CFileIO::write(offset_t pos, size32_t len, const void * data)
{
unflushedWriteBytes.store(0);
// request to write-out dirty pages
syncFileData(file, true, extraFlags, true);
syncFileData(*this, file, true, extraFlags, true);
}
}
return ret;
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jmisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ void throwExceptionIfAborting()

StringBuffer & hexdump2string(byte const * in, size32_t inSize, StringBuffer & out)
{
out.append("[");
out.appendf("%u bytes [", inSize);
byte last = 0;
unsigned seq = 1;
for(unsigned i=0; i<inSize; ++i)
Expand Down
16 changes: 4 additions & 12 deletions system/jlib/jsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,10 @@ class CSocket: public ISocket, public CInterface
enum { ss_open, ss_shutdown, ss_close, ss_pre_open } state;
T_SOCKET sock;
// char* hostname; // host address
unsigned short hostport; // host port
unsigned short hostport; // host port (NB: this is the peer port if an attached socket)
unsigned short localPort;
SOCKETMODE sockmode;
IpAddress targetip;
IpAddress targetip; // NB: this is peer if an attached socket
SocketEndpoint returnep; // set by set_return_addr

MCASTREQ * mcastreq;
Expand Down Expand Up @@ -1472,16 +1472,8 @@ SocketEndpoint &CSocket::getPeerEndpoint(SocketEndpoint &ep)
if (sockmode==sm_udp_server) { // udp server
ep.set(returnep);
}
else {
DEFINE_SOCKADDR(u);
socklen_t ul = sizeof(u);
if (::getpeername(sock,&u.sa, &ul)<0) {
DBGLOG("getpeername failed %d",SOCKETERRNO());
ep.set(NULL, 0);
}
else
getSockAddrEndpoint(u,ul,ep);
}
else
ep.set(hostport, targetip); // NB: if this is an attached socket, targetip/hostpost are the peer
return ep;
}

Expand Down
40 changes: 28 additions & 12 deletions system/mp/mpcomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ struct MultiPacketHeader

class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
{
StringAttr msg;
public:
IMPLEMENT_IINTERFACE;

CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
CMPException(MessagePassingError err,const SocketEndpoint &ep, const char *_msg = nullptr) : error(err), endpoint(ep), msg(_msg)
{
}

Expand All @@ -240,6 +241,8 @@ class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
// change it from "MP link closed" to something more helpful
case MPERR_link_closed: str.appendf("Unexpected process termination (ep:%s)",endpoint.getEndpointHostText(tmp).str()); break;
}
if (msg.length())
str.append(" - ").append(msg);
return str;
}
int errorCode() const { return error; }
Expand Down Expand Up @@ -1813,6 +1816,8 @@ class ForwardPacketHandler // TAG_SYS_FORWARD
};


static PeriodicTimer periodicTimer(10*60*1000, false); // 10 minutes
static std::atomic<__uint64> mpProtocolErrors{0};
// --------------------------------------------------------

class CMPPacketReader: public ISocketSelectNotify, public CInterface
Expand Down Expand Up @@ -1847,7 +1852,8 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
{
if (!parent)
return false;
bool gc = false; // if a gc is hit, then will fall through to close socket
bool closeSocket = false; // if a graceful close is hit, this will be set and will fall through to close socket
bool suppressException = false;
try
{
while (true) // NB: breaks out if blocked (if (remaining) ..)
Expand All @@ -1872,7 +1878,7 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
if (!gotPacketHdr)
{
CCycleTimer timer;
gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000));
closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000));
remaining -= szRead;
activeptr += szRead;
if (remaining) // only possible if blocked.
Expand All @@ -1882,10 +1888,20 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100)
{
// TBD IPV6 here
mpProtocolErrors++;
SocketEndpoint ep;
sock->getPeerEndpoint(ep);
IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
throw e;
if (periodicTimer.hasElapsed())
{
VStringBuffer packetHdrBytes("[%" I64F "u incidents to date]. Packet Header: ", mpProtocolErrors.load());
hexdump2string((byte const *)&hdr, sizeof(hdr), packetHdrBytes);
throw new CMPException(MPERR_protocol_version_mismatch, ep, packetHdrBytes.str());
}
else
{
suppressException = true;
throw new CMPException(MPERR_protocol_version_mismatch, ep);
}
}
hdr.setMessageFields(*activemsg);
#ifdef _FULLTRACE
Expand All @@ -1898,9 +1914,9 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
gotPacketHdr = true;
}

if (!gc && remaining)
if (!closeSocket && remaining)
{
gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER);
closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER);
remaining -= szRead;
activeptr += szRead;
}
Expand Down Expand Up @@ -1939,19 +1955,19 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
}
}
while (activemsg);
if (gc)
if (closeSocket)
break;
}
}
catch (IException *e)
{
if (e->errorCode()!=JSOCKERR_graceful_close)
FLLOG(MCoperatorWarning, e,"MP(Packet Reader)");
if (!suppressException && e->errorCode()!=JSOCKERR_graceful_close)
FLLOG(MCoperatorWarning, e, "MP(Packet Reader)");
e->Release();
gotPacketHdr = false;
closeSocket = true; // NB: this select handler will removed and not be notified again
}

if (gc)
if (closeSocket)
{
// here due to error or graceful close, so close socket (ignore error as may be closed already)
try
Expand Down

0 comments on commit ea1047a

Please sign in to comment.