From 38a3844099a83c7110dfc6e5c61399e40900166d Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Tue, 12 Sep 2023 17:56:43 +0100 Subject: [PATCH] HPCC-18382 Add hostname to IpAddress Signed-off-by: Gavin Halliday --- dali/base/dafdesc.cpp | 21 ++++++++------ system/jlib/jsocket.cpp | 52 +++++++++++++++-------------------- system/jlib/jsocket.hpp | 11 ++------ thorlcr/master/mawatchdog.cpp | 4 +-- thorlcr/msort/tsortm.cpp | 34 +++++++++++------------ thorlcr/shared/thwatchdog.hpp | 13 +++++++++ thorlcr/slave/slwatchdog.cpp | 2 +- 7 files changed, 71 insertions(+), 66 deletions(-) diff --git a/dali/base/dafdesc.cpp b/dali/base/dafdesc.cpp index 80b3bed36cf..d624b0c3f62 100644 --- a/dali/base/dafdesc.cpp +++ b/dali/base/dafdesc.cpp @@ -35,6 +35,8 @@ #include "jsecrets.hpp" #include "rmtfile.hpp" +#include + #define INCLUDE_1_OF_1 // whether to use 1_of_1 for single part files #define SDS_CONNECT_TIMEOUT (1000*60*60*2) // better than infinite @@ -944,35 +946,38 @@ void getClusterInfo(IPropertyTree &pt, INamedGroupStore *resolver, unsigned flag cgroup.setown(resolver->lookup(grp)); // get nodes from parts if complete (and group 0) if (gi==0) { // don't assume lookup name correct! - SocketEndpoint *eps = (SocketEndpoint *)calloc(np?np:1,sizeof(SocketEndpoint)); MemoryBuffer mb; Owned piter; if (pt.getPropBin("Parts",mb)) piter.setown(deserializePartAttrIterator(mb)); else piter.setown(pt.getElements("Part")); + ForEach(*piter) { IPropertyTree &cpt = piter->query(); unsigned num = cpt.getPropInt("@num"); - if (num>np) { - eps = (SocketEndpoint *)checked_realloc(eps,num*sizeof(SocketEndpoint),np*sizeof(SocketEndpoint),-21); - memset(eps+np,0,(num-np)*sizeof(SocketEndpoint)); + if (num>np) np = num; - } + } + + std::unique_ptr eps(new SocketEndpoint[np?np:1]); + ForEach(*piter) { + IPropertyTree &cpt = piter->query(); + unsigned num = cpt.getPropInt("@num"); const char *node = cpt.queryProp("@node"); if (node&&*node) - eps[num-1].set(node); + eps.get()[num-1].set(node); } unsigned i=0; for (i=0;i ngrp = createIGroup(np,eps); + Owned ngrp = createIGroup(np,eps.get()); if (!cgroup.get()||(ngrp->compare(cgroup)!=GRbasesubset)) cgroup.setown(ngrp.getClear()); } - free(eps); + } ClusterPartDiskMapSpec mspec; IClusterInfo *cluster = createClusterInfo(grp,cgroup,mspec,resolver); diff --git a/system/jlib/jsocket.cpp b/system/jlib/jsocket.cpp index 199e66ffa2d..3b6e4d5b032 100644 --- a/system/jlib/jsocket.cpp +++ b/system/jlib/jsocket.cpp @@ -476,7 +476,7 @@ class CSocket: public ISocket, public CInterface friend class CSocketConnectWait; enum { ss_open, ss_shutdown, ss_close, ss_pre_open } state; T_SOCKET sock; - char* hostname; // host address +// char* hostname; // host address unsigned short hostport; // host port unsigned short localPort; SOCKETMODE sockmode; @@ -968,7 +968,7 @@ size32_t CSocket::avail_read() int CSocket::pre_connect (bool block) { - if (NULL == hostname || '\0' == (*hostname)) + if (targetip.isNull()) { StringBuffer err; err.appendf("CSocket::pre_connect - Invalid/missing host IP address raised in : %s, line %d",sanitizeSourceFile(__FILE__), __LINE__); @@ -977,10 +977,6 @@ int CSocket::pre_connect (bool block) } DEFINE_SOCKADDR(u); - if (targetip.isNull()) { - set_return_addr(hostport,hostname); - targetip.ipset(returnep); - } socklen_t ul = setSockAddr(u,targetip,hostport); sock = ::socket(u.sa.sa_family, SOCK_STREAM, targetip.isIp4()?0:PF_INET6); owned = true; @@ -1082,11 +1078,7 @@ void CSocket::open(int listen_queue_size,bool reuseports) DEFINE_SOCKADDR(u); socklen_t ul; - if (hostname) { - if (targetip.isNull()) { - set_return_addr(hostport,hostname); - targetip.ipset(returnep); - } + if (!targetip.isNull()) { ul = setSockAddr(u,targetip,hostport); } else @@ -1705,6 +1697,8 @@ void CSocket::setTraceName(const char * prefix, const char * name) void CSocket::setTraceName() { #ifdef _TRACE + StringBuffer hostname; + targetip.getIpText(hostname); setTraceName("C!", hostname); #endif } @@ -1724,10 +1718,6 @@ ISocket* ISocket::connect_wait( const SocketEndpoint &ep, unsigned timems) void CSocket::udpconnect() { DEFINE_SOCKADDR(u); - if (targetip.isNull()) { - set_return_addr(hostport,hostname); - targetip.ipset(returnep); - } socklen_t ul = setSockAddr(u,targetip,hostport); sock = ::socket(u.sa.sa_family, SOCK_DGRAM, targetip.isIp4()?0:PF_INET6); #ifdef SOCKTRACE @@ -1759,7 +1749,9 @@ int CSocket::logPollError(unsigned revents, const char *rwstr) if (revents & POLLERR) { StringBuffer errStr; - errStr.appendf("%s POLLERR %u l:%d r:%s:%d", rwstr, sock, localPort, (hostname?hostname:"NULL"), hostport); + StringBuffer hostname; + targetip.getIpText(hostname); + errStr.appendf("%s POLLERR %u l:%d r:%s:%d", rwstr, sock, localPort, hostname.str(), hostport); int serror = 0; socklen_t serrlen = sizeof(serror); int srtn = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&serror, &serrlen); @@ -2874,7 +2866,9 @@ void CSocket::set_ttl(unsigned _ttl) void CSocket::logConnectionInfo(unsigned timeoutms, unsigned conn_mstime) { - PROGLOG("SOCKTRACE: connect(%u) - time:%u ms fd:%d l:%d r:%s:%d", timeoutms, conn_mstime, sock, localPort, (hostname?hostname:"NULL"), hostport); + StringBuffer hostname; + targetip.getIpText(hostname); + PROGLOG("SOCKTRACE: connect(%u) - time:%u ms fd:%d l:%d r:%s:%d", timeoutms, conn_mstime, sock, localPort, hostname.str(), hostport); // PrintStackReport(); } @@ -2921,8 +2915,6 @@ CSocket::~CSocket() e->Release(); } } - free(hostname); - hostname = NULL; #ifdef _TRACE free(tracename); tracename = NULL; @@ -2939,8 +2931,8 @@ CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name) #endif nagling = true; // until turned off hostport = ep.port; + targetip.ipset(ep); localPort = 0; - hostname = NULL; mcastreq = NULL; #ifdef _TRACE tracename = NULL; @@ -2950,9 +2942,9 @@ CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name) mcastreq = new MCASTREQ(name); } else { - if (!name&&!ep.isNull()) - name = ep.getIpText(tmp).str(); - hostname = name?strdup(name):NULL; + //MORE: I don't think the name parameter is needed anymore + if (name && ep.isNull()) + targetip.ipset(name); } sock = INVALID_SOCKET; sockmode = smode; @@ -2966,9 +2958,7 @@ CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name) else { StringBuffer hostname; - SocketEndpoint self; - self.setLocalHost(0); - self.getUrlStr(hostname); + targetip.getIpText(hostname); setTraceName("S>", hostname.str()); } #endif @@ -2998,7 +2988,7 @@ CSocket::CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned) //set_linger(DEFAULT_LINGER_TIME); -- experiment with removing this as closesocket should still endevour to send outstanding data char peer[256]; hostport = peer_name(peer,sizeof(peer)); - hostname = strdup(peer); + targetip.ipset(peer); SocketEndpoint ep; localPort = getEndpoint(ep).port; #ifdef _TRACE @@ -3608,9 +3598,11 @@ bool IpAddress::ipset(const char *text) { if (text&&*text) { if ((text[0]=='.')&&(text[1]==0)) { + hostname.set(GetCachedHostName()); // Is this better than '.'? ipset(queryHostIP()); return true; } + hostname.set(text); if (decodeNumericIP(text,netaddr)) return true; const char *s; @@ -3623,6 +3615,7 @@ bool IpAddress::ipset(const char *text) return true; } memset(&netaddr,0,sizeof(netaddr)); + hostname.clear(); return false; } @@ -3646,6 +3639,8 @@ inline char * addbyte(char *s,byte b) StringBuffer & IpAddress::getIpText(StringBuffer & out) const { + if (hostname) + return out.append(hostname); if (::isIp4(netaddr)) { const byte *ip = (const byte *)&netaddr[3]; char ips[16]; @@ -7333,6 +7328,3 @@ extern jlib_decl void shutdownAndCloseNoThrow(ISocket * optSocket) e->Release(); } } - -static_assert(sizeof(IpAddress) == 16, "check size of IpAddress"); -static_assert(sizeof(SocketEndpoint) == 20, "check size of SocketEndpoint"); diff --git a/system/jlib/jsocket.hpp b/system/jlib/jsocket.hpp index f21d164a3d2..62a29e666e6 100644 --- a/system/jlib/jsocket.hpp +++ b/system/jlib/jsocket.hpp @@ -83,13 +83,13 @@ enum JSOCKET_ERROR_CODES { class jlib_decl IpAddress { unsigned netaddr[4] = { 0, 0, 0, 0 }; + StringAttr hostname; // not currently serialized public: IpAddress() = default; - IpAddress(const IpAddress& other) { ipset(other); } explicit IpAddress(const char *text) { ipset(text); } bool ipset(const char *text); // sets to NULL if fails or text=NULL - void ipset(const IpAddress& other) { memcpy(&netaddr,&other.netaddr,sizeof(netaddr)); } + void ipset(const IpAddress& other) { *this = other; } bool ipequals(const IpAddress & other) const; int ipcompare(const IpAddress & other) const; // depreciated unsigned iphash(unsigned prev=0) const; @@ -111,14 +111,9 @@ class jlib_decl IpAddress size32_t getNetAddress(size32_t maxsz,void *dst) const; // for internal use - returns 0 if address doesn't fit void setNetAddress(size32_t sz,const void *src); // for internal use + const char * queryHostname() const { return hostname.get(); } inline bool operator == ( const IpAddress & other) const { return ipequals(other); } - inline IpAddress & operator = ( const IpAddress &other ) - { - ipset(other); - return *this; - } - }; struct IpComparator diff --git a/thorlcr/master/mawatchdog.cpp b/thorlcr/master/mawatchdog.cpp index 1e6c49a5165..153fe2e34b5 100644 --- a/thorlcr/master/mawatchdog.cpp +++ b/thorlcr/master/mawatchdog.cpp @@ -176,8 +176,8 @@ unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer IWARNLOG("Receive Monitor Packet: wrong size, got %d, less than HeartBeatPacketHeader size", read); return 0; } - //Cast is to avoid warning about writing to an object with non trivial copy assignment - memcpy(reinterpret_cast(&hb), mb.readDirect(sizeof(HeartBeatPacketHeader)), sizeof(HeartBeatPacketHeader)); + + hb.deserialize(mb); if (read != hb.packetSize) // check for corrupt packets { IWARNLOG("Receive Monitor Packet: wrong size, expected %d, got %d", hb.packetSize, read); diff --git a/thorlcr/msort/tsortm.cpp b/thorlcr/msort/tsortm.cpp index e093c5d70f6..d2c78eca91f 100644 --- a/thorlcr/msort/tsortm.cpp +++ b/thorlcr/msort/tsortm.cpp @@ -179,27 +179,18 @@ inline byte *dupb(byte *b,size32_t l) struct PartitionInfo { - size32_t guard; - Linked prowif; PartitionInfo(CActivityBase *_activity, IThorRowInterfaces *rowif) : splitkeys(*_activity, rowif, ers_allow), prowif(rowif) { - nodes = NULL; - mpports = NULL; guard = rowif?rowif->queryRowMetaData()->getMinRecordSize():(size32_t)-1; } ~PartitionInfo() { - free(nodes); + delete [] nodes; free(mpports); } - unsigned numnodes; - SocketEndpoint *nodes; - unsigned short *mpports; - mptag_t mpTagRPC; - CThorExpandingRowArray splitkeys; void init() { nodes = NULL; @@ -209,7 +200,7 @@ struct PartitionInfo } void kill() { - free(nodes); + delete [] nodes; free(mpports); init(); } @@ -218,6 +209,15 @@ struct PartitionInfo // should be more defensive here return (numnodes!=0)&&(splitkeys.ordinality()!=0); } + + + Linked prowif; + size32_t guard; + unsigned numnodes; + SocketEndpoint *nodes = nullptr; + unsigned short *mpports = nullptr; + mptag_t mpTagRPC; + CThorExpandingRowArray splitkeys; }; @@ -353,10 +353,10 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface estrecsize = 100; if (!partitioninfo) partitioninfo = new PartitionInfo(activity, keyIf); - free(partitioninfo->nodes); + delete [] partitioninfo->nodes; free(partitioninfo->mpports); partitioninfo->numnodes=numnodes; - partitioninfo->nodes=(SocketEndpoint *)malloc(numnodes*sizeof(SocketEndpoint)); + partitioninfo->nodes = new SocketEndpoint[numnodes]; partitioninfo->mpports=(unsigned short *)malloc(numnodes*sizeof(unsigned short)); partitioninfo->mpTagRPC = slaves.item(0).mpTagRPC; // NB all same @@ -1191,8 +1191,8 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface } timer.stop("Calculating split map"); } - OwnedMalloc endpoints(numnodes); - SocketEndpoint *epp = endpoints; + std::unique_ptr endpoints(new SocketEndpoint[numnodes]); + SocketEndpoint *epp = endpoints.get(); for (i=0;i, implement hb.sender = self; hb.tick++; size32_t progressSizePos = (byte *)&hb.progressSize - (byte *)&hb; - sendMb.append(sizeof(HeartBeatPacketHeader), &hb); + hb.serialize(sendMb); hb.progressSize = gatherData(progressMb); sendMb.writeDirect(progressSizePos, sizeof(hb.progressSize), &hb.progressSize);