Skip to content

Commit

Permalink
Merge pull request #17768 from ghalliday/preserveHostname
Browse files Browse the repository at this point in the history
HPCC-18382 Add hostname to IpAddress

Reviewed-by: Mark Kelly [email protected]
Reviewed-by: Jake Smith <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Sep 15, 2023
2 parents 411d7f6 + 38a3844 commit 9c7ef25
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 66 deletions.
21 changes: 13 additions & 8 deletions dali/base/dafdesc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "jsecrets.hpp"
#include "rmtfile.hpp"

#include <memory>

#define INCLUDE_1_OF_1 // whether to use 1_of_1 for single part files

#define SDS_CONNECT_TIMEOUT (1000*60*60*2) // better than infinite
Expand Down Expand Up @@ -944,35 +946,38 @@ void getClusterInfo(IPropertyTree &pt, INamedGroupStore *resolver, unsigned flag
cgroup.setown(resolver->lookup(grp));
// get nodes from parts if complete (and group 0)
if (gi==0) { // don't assume lookup name correct!
SocketEndpoint *eps = (SocketEndpoint *)calloc(np?np:1,sizeof(SocketEndpoint));
MemoryBuffer mb;
Owned<IPropertyTreeIterator> piter;
if (pt.getPropBin("Parts",mb))
piter.setown(deserializePartAttrIterator(mb));
else
piter.setown(pt.getElements("Part"));

ForEach(*piter) {
IPropertyTree &cpt = piter->query();
unsigned num = cpt.getPropInt("@num");
if (num>np) {
eps = (SocketEndpoint *)checked_realloc(eps,num*sizeof(SocketEndpoint),np*sizeof(SocketEndpoint),-21);
memset(eps+np,0,(num-np)*sizeof(SocketEndpoint));
if (num>np)
np = num;
}
}

std::unique_ptr<SocketEndpoint[]> eps(new SocketEndpoint[np?np:1]);
ForEach(*piter) {
IPropertyTree &cpt = piter->query();
unsigned num = cpt.getPropInt("@num");
const char *node = cpt.queryProp("@node");
if (node&&*node)
eps[num-1].set(node);
eps.get()[num-1].set(node);
}
unsigned i=0;
for (i=0;i<np;i++)
if (eps[i].isNull())
break;
if (i==np) {
Owned<IGroup> ngrp = createIGroup(np,eps);
Owned<IGroup> ngrp = createIGroup(np,eps.get());
if (!cgroup.get()||(ngrp->compare(cgroup)!=GRbasesubset))
cgroup.setown(ngrp.getClear());
}
free(eps);

}
ClusterPartDiskMapSpec mspec;
IClusterInfo *cluster = createClusterInfo(grp,cgroup,mspec,resolver);
Expand Down
52 changes: 22 additions & 30 deletions system/jlib/jsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ class CSocket: public ISocket, public CInterface
friend class CSocketConnectWait;
enum { ss_open, ss_shutdown, ss_close, ss_pre_open } state;
T_SOCKET sock;
char* hostname; // host address
// char* hostname; // host address
unsigned short hostport; // host port
unsigned short localPort;
SOCKETMODE sockmode;
Expand Down Expand Up @@ -968,7 +968,7 @@ size32_t CSocket::avail_read()

int CSocket::pre_connect (bool block)
{
if (NULL == hostname || '\0' == (*hostname))
if (targetip.isNull())
{
StringBuffer err;
err.appendf("CSocket::pre_connect - Invalid/missing host IP address raised in : %s, line %d",sanitizeSourceFile(__FILE__), __LINE__);
Expand All @@ -977,10 +977,6 @@ int CSocket::pre_connect (bool block)
}

DEFINE_SOCKADDR(u);
if (targetip.isNull()) {
set_return_addr(hostport,hostname);
targetip.ipset(returnep);
}
socklen_t ul = setSockAddr(u,targetip,hostport);
sock = ::socket(u.sa.sa_family, SOCK_STREAM, targetip.isIp4()?0:PF_INET6);
owned = true;
Expand Down Expand Up @@ -1082,11 +1078,7 @@ void CSocket::open(int listen_queue_size,bool reuseports)

DEFINE_SOCKADDR(u);
socklen_t ul;
if (hostname) {
if (targetip.isNull()) {
set_return_addr(hostport,hostname);
targetip.ipset(returnep);
}
if (!targetip.isNull()) {
ul = setSockAddr(u,targetip,hostport);
}
else
Expand Down Expand Up @@ -1705,6 +1697,8 @@ void CSocket::setTraceName(const char * prefix, const char * name)
void CSocket::setTraceName()
{
#ifdef _TRACE
StringBuffer hostname;
targetip.getIpText(hostname);
setTraceName("C!", hostname);
#endif
}
Expand All @@ -1724,10 +1718,6 @@ ISocket* ISocket::connect_wait( const SocketEndpoint &ep, unsigned timems)
void CSocket::udpconnect()
{
DEFINE_SOCKADDR(u);
if (targetip.isNull()) {
set_return_addr(hostport,hostname);
targetip.ipset(returnep);
}
socklen_t ul = setSockAddr(u,targetip,hostport);
sock = ::socket(u.sa.sa_family, SOCK_DGRAM, targetip.isIp4()?0:PF_INET6);
#ifdef SOCKTRACE
Expand Down Expand Up @@ -1759,7 +1749,9 @@ int CSocket::logPollError(unsigned revents, const char *rwstr)
if (revents & POLLERR)
{
StringBuffer errStr;
errStr.appendf("%s POLLERR %u l:%d r:%s:%d", rwstr, sock, localPort, (hostname?hostname:"NULL"), hostport);
StringBuffer hostname;
targetip.getIpText(hostname);
errStr.appendf("%s POLLERR %u l:%d r:%s:%d", rwstr, sock, localPort, hostname.str(), hostport);
int serror = 0;
socklen_t serrlen = sizeof(serror);
int srtn = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&serror, &serrlen);
Expand Down Expand Up @@ -2874,7 +2866,9 @@ void CSocket::set_ttl(unsigned _ttl)

void CSocket::logConnectionInfo(unsigned timeoutms, unsigned conn_mstime)
{
PROGLOG("SOCKTRACE: connect(%u) - time:%u ms fd:%d l:%d r:%s:%d", timeoutms, conn_mstime, sock, localPort, (hostname?hostname:"NULL"), hostport);
StringBuffer hostname;
targetip.getIpText(hostname);
PROGLOG("SOCKTRACE: connect(%u) - time:%u ms fd:%d l:%d r:%s:%d", timeoutms, conn_mstime, sock, localPort, hostname.str(), hostport);
// PrintStackReport();
}

Expand Down Expand Up @@ -2921,8 +2915,6 @@ CSocket::~CSocket()
e->Release();
}
}
free(hostname);
hostname = NULL;
#ifdef _TRACE
free(tracename);
tracename = NULL;
Expand All @@ -2939,8 +2931,8 @@ CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name)
#endif
nagling = true; // until turned off
hostport = ep.port;
targetip.ipset(ep);
localPort = 0;
hostname = NULL;
mcastreq = NULL;
#ifdef _TRACE
tracename = NULL;
Expand All @@ -2950,9 +2942,9 @@ CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name)
mcastreq = new MCASTREQ(name);
}
else {
if (!name&&!ep.isNull())
name = ep.getIpText(tmp).str();
hostname = name?strdup(name):NULL;
//MORE: I don't think the name parameter is needed anymore
if (name && ep.isNull())
targetip.ipset(name);
}
sock = INVALID_SOCKET;
sockmode = smode;
Expand All @@ -2966,9 +2958,7 @@ CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name)
else
{
StringBuffer hostname;
SocketEndpoint self;
self.setLocalHost(0);
self.getUrlStr(hostname);
targetip.getIpText(hostname);
setTraceName("S>", hostname.str());
}
#endif
Expand Down Expand Up @@ -2998,7 +2988,7 @@ CSocket::CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned)
//set_linger(DEFAULT_LINGER_TIME); -- experiment with removing this as closesocket should still endevour to send outstanding data
char peer[256];
hostport = peer_name(peer,sizeof(peer));
hostname = strdup(peer);
targetip.ipset(peer);
SocketEndpoint ep;
localPort = getEndpoint(ep).port;
#ifdef _TRACE
Expand Down Expand Up @@ -3608,9 +3598,11 @@ bool IpAddress::ipset(const char *text)
{
if (text&&*text) {
if ((text[0]=='.')&&(text[1]==0)) {
hostname.set(GetCachedHostName()); // Is this better than '.'?
ipset(queryHostIP());
return true;
}
hostname.set(text);
if (decodeNumericIP(text,netaddr))
return true;
const char *s;
Expand All @@ -3623,6 +3615,7 @@ bool IpAddress::ipset(const char *text)
return true;
}
memset(&netaddr,0,sizeof(netaddr));
hostname.clear();
return false;
}

Expand All @@ -3646,6 +3639,8 @@ inline char * addbyte(char *s,byte b)

StringBuffer & IpAddress::getIpText(StringBuffer & out) const
{
if (hostname)
return out.append(hostname);
if (::isIp4(netaddr)) {
const byte *ip = (const byte *)&netaddr[3];
char ips[16];
Expand Down Expand Up @@ -7333,6 +7328,3 @@ extern jlib_decl void shutdownAndCloseNoThrow(ISocket * optSocket)
e->Release();
}
}

static_assert(sizeof(IpAddress) == 16, "check size of IpAddress");
static_assert(sizeof(SocketEndpoint) == 20, "check size of SocketEndpoint");
11 changes: 3 additions & 8 deletions system/jlib/jsocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ enum JSOCKET_ERROR_CODES {
class jlib_decl IpAddress
{
unsigned netaddr[4] = { 0, 0, 0, 0 };
StringAttr hostname; // not currently serialized
public:
IpAddress() = default;
IpAddress(const IpAddress& other) { ipset(other); }
explicit IpAddress(const char *text) { ipset(text); }

bool ipset(const char *text); // sets to NULL if fails or text=NULL
void ipset(const IpAddress& other) { memcpy(&netaddr,&other.netaddr,sizeof(netaddr)); }
void ipset(const IpAddress& other) { *this = other; }
bool ipequals(const IpAddress & other) const;
int ipcompare(const IpAddress & other) const; // depreciated
unsigned iphash(unsigned prev=0) const;
Expand All @@ -111,14 +111,9 @@ class jlib_decl IpAddress

size32_t getNetAddress(size32_t maxsz,void *dst) const; // for internal use - returns 0 if address doesn't fit
void setNetAddress(size32_t sz,const void *src); // for internal use
const char * queryHostname() const { return hostname.get(); }

inline bool operator == ( const IpAddress & other) const { return ipequals(other); }
inline IpAddress & operator = ( const IpAddress &other )
{
ipset(other);
return *this;
}

};

struct IpComparator
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/master/mawatchdog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer
IWARNLOG("Receive Monitor Packet: wrong size, got %d, less than HeartBeatPacketHeader size", read);
return 0;
}
//Cast is to avoid warning about writing to an object with non trivial copy assignment
memcpy(reinterpret_cast<void *>(&hb), mb.readDirect(sizeof(HeartBeatPacketHeader)), sizeof(HeartBeatPacketHeader));

hb.deserialize(mb);
if (read != hb.packetSize) // check for corrupt packets
{
IWARNLOG("Receive Monitor Packet: wrong size, expected %d, got %d", hb.packetSize, read);
Expand Down
34 changes: 17 additions & 17 deletions thorlcr/msort/tsortm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,27 +179,18 @@ inline byte *dupb(byte *b,size32_t l)

struct PartitionInfo
{
size32_t guard;
Linked<IThorRowInterfaces> prowif;
PartitionInfo(CActivityBase *_activity, IThorRowInterfaces *rowif)
: splitkeys(*_activity, rowif, ers_allow), prowif(rowif)
{
nodes = NULL;
mpports = NULL;
guard = rowif?rowif->queryRowMetaData()->getMinRecordSize():(size32_t)-1;
}

~PartitionInfo()
{
free(nodes);
delete [] nodes;
free(mpports);
}

unsigned numnodes;
SocketEndpoint *nodes;
unsigned short *mpports;
mptag_t mpTagRPC;
CThorExpandingRowArray splitkeys;
void init()
{
nodes = NULL;
Expand All @@ -209,7 +200,7 @@ struct PartitionInfo
}
void kill()
{
free(nodes);
delete [] nodes;
free(mpports);
init();
}
Expand All @@ -218,6 +209,15 @@ struct PartitionInfo
// should be more defensive here
return (numnodes!=0)&&(splitkeys.ordinality()!=0);
}


Linked<IThorRowInterfaces> prowif;
size32_t guard;
unsigned numnodes;
SocketEndpoint *nodes = nullptr;
unsigned short *mpports = nullptr;
mptag_t mpTagRPC;
CThorExpandingRowArray splitkeys;
};


Expand Down Expand Up @@ -353,10 +353,10 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
estrecsize = 100;
if (!partitioninfo)
partitioninfo = new PartitionInfo(activity, keyIf);
free(partitioninfo->nodes);
delete [] partitioninfo->nodes;
free(partitioninfo->mpports);
partitioninfo->numnodes=numnodes;
partitioninfo->nodes=(SocketEndpoint *)malloc(numnodes*sizeof(SocketEndpoint));
partitioninfo->nodes = new SocketEndpoint[numnodes];
partitioninfo->mpports=(unsigned short *)malloc(numnodes*sizeof(unsigned short));
partitioninfo->mpTagRPC = slaves.item(0).mpTagRPC; // NB all same

Expand Down Expand Up @@ -1191,8 +1191,8 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
}
timer.stop("Calculating split map");
}
OwnedMalloc<SocketEndpoint> endpoints(numnodes);
SocketEndpoint *epp = endpoints;
std::unique_ptr<SocketEndpoint[]> endpoints(new SocketEndpoint[numnodes]);
SocketEndpoint *epp = endpoints.get();
for (i=0;i<numnodes;i++)
{
CSortNode &slave = slaves.item(i);
Expand Down Expand Up @@ -1333,9 +1333,9 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
char url[100];
slave.endpoint.getUrlStr(url,sizeof(url));
if (splitMapUpper)
slave.MultiMergeBetween(total, numnodes*numnodes,splitMap,splitMapUpper,numnodes,endpoints);
slave.MultiMergeBetween(total, numnodes*numnodes,splitMap,splitMapUpper,numnodes,endpoints.get());
else
slave.MultiMerge(total, numnodes*numnodes,splitMap,numnodes,endpoints);
slave.MultiMerge(total, numnodes*numnodes,splitMap,numnodes,endpoints.get());
// ActPrintLog(activity, "Merge %d started: %d rows on %s",i,tot[i],url);
}
}
Expand Down
13 changes: 13 additions & 0 deletions thorlcr/shared/thwatchdog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,23 @@

struct HeartBeatPacketHeader
{
public:
size32_t packetSize = 0; // used as validity check must be first
SocketEndpoint sender;
unsigned tick = 0; // sequence check
size32_t progressSize = 0; // size of progress data (following performance data)

public:
void serialize(MemoryBuffer & out) const
{
out.append(packetSize).append(tick).append(progressSize);
sender.serialize(out);
}
void deserialize(MemoryBuffer & in)
{
in.read(packetSize).read(tick).read(progressSize);
sender.deserialize(in);
}
};

#endif
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/slave/slwatchdog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class CGraphProgressHandlerBase : public CInterfaceOf<ISlaveWatchdog>, implement
hb.sender = self;
hb.tick++;
size32_t progressSizePos = (byte *)&hb.progressSize - (byte *)&hb;
sendMb.append(sizeof(HeartBeatPacketHeader), &hb);
hb.serialize(sendMb);

hb.progressSize = gatherData(progressMb);
sendMb.writeDirect(progressSizePos, sizeof(hb.progressSize), &hb.progressSize);
Expand Down

0 comments on commit 9c7ef25

Please sign in to comment.