Skip to content

Commit

Permalink
HPCC-31990 Add timeout to DNS lookups for soapcalls 2
Browse files Browse the repository at this point in the history
Signed-off-by: M Kelly <[email protected]>
  • Loading branch information
mckellyln committed Jul 18, 2024
1 parent eb0a0b7 commit b3a1885
Showing 1 changed file with 58 additions and 100 deletions.
158 changes: 58 additions & 100 deletions system/jlib/jsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,35 +382,38 @@ enum SOCKETMODE { sm_tcp_server, sm_tcp, sm_udp_server, sm_udp, sm_multicast_ser
# endif
#endif

static CriticalSection queryKACS;
enum UseUDE { UNINIT, INITED };
static std::atomic<UseUDE> expertTCPSettings { UNINIT };
static CriticalSection queryTCPCS;

enum UseUDE { UNINIT, DISABLED, ENABLED };
static std::atomic<UseUDE> doKeepAlive { UNINIT };
static bool hasKeepAlive = false;
static int keepAliveTime = -1;
static int keepAliveInterval = -1;
static int keepAliveProbes = -1;
static bool disableDNSTimeout = false;

/*
<Software>
<Globals>
<Globals disableDNSTimeout="false">
<keepalive time="200" interval="75" probes="9"/>
</Globals>
global:
expert:
disableDNSTimeout: false
keepalive:
time: 200
interval: 75
probes: 9
*/

extern jlib_decl bool queryKeepAlive(int &time, int &intvl, int &probes)
static void queryTCPSettings()
{
UseUDE state = doKeepAlive.load();
UseUDE state = expertTCPSettings.load();
if (state == UNINIT)
{
CriticalBlock block(queryKACS);
state = doKeepAlive.load();
CriticalBlock block(queryTCPCS);
state = expertTCPSettings.load();
if (state == UNINIT)
{
#ifdef _CONTAINERIZED
Expand All @@ -437,7 +440,6 @@ extern jlib_decl bool queryKeepAlive(int &time, int &intvl, int &probes)
catch (...)
{
}
state = DISABLED;
if (expert)
{
IPropertyTree *keepalive = expert->queryPropTree("keepalive");
Expand All @@ -446,22 +448,25 @@ extern jlib_decl bool queryKeepAlive(int &time, int &intvl, int &probes)
keepAliveTime = keepalive->getPropInt("@time", keepAliveTime);
keepAliveInterval = keepalive->getPropInt("@interval", keepAliveInterval);
keepAliveProbes = keepalive->getPropInt("@probes", keepAliveProbes);
state = ENABLED;
hasKeepAlive = true;
}
disableDNSTimeout = expert->getPropBool("@disableDNSTimeout", false);
}
doKeepAlive = state;
expertTCPSettings = INITED;
}
}
}

if (state == ENABLED)
extern jlib_decl bool queryKeepAlive(int &time, int &intvl, int &probes)
{
queryTCPSettings();
if (hasKeepAlive)
{
time = keepAliveTime;
intvl = keepAliveInterval;
probes = keepAliveProbes;
return true;
}
else
return false;
return hasKeepAlive;
}

static bool getAddressInfo(const char *name, unsigned *netaddr, bool okToLogErr);
Expand All @@ -473,32 +478,27 @@ static std::list<GetAddrInfoThread *> gaPtrList;

class GetAddrInfoThread : public Thread
{
char name[256];
StringAttr name;
Semaphore semait;
std::atomic<bool> started;
std::atomic<bool> ended;
unsigned netaddr[4];
bool result;
std::atomic<bool> ended{false};
unsigned netaddr[4] = { 0, 0, 0, 0 };
bool result = false;

public:
GetAddrInfoThread(const char *_name) : started(false), ended(false), result(false)
GetAddrInfoThread(const char *_name)
{
strncpy(name, _name, 255);
name[255] = '\0';
memset(netaddr, 0, sizeof(netaddr));
name.set(_name);
}

bool thrdHasEnded()
{
if (started)
return ended;
return false;
return ended;
}

bool waitms(unsigned timeoutms, unsigned *resAddr)
{
bool ret = semait.wait(timeoutms);
if (ret && ended)
if (ret)
{
if (result)
memcpy(resAddr, netaddr, sizeof(netaddr));
Expand All @@ -515,8 +515,7 @@ class GetAddrInfoThread : public Thread

virtual int run() override
{
started = true;
result = getAddressInfo(name, netaddr, false);
result = getAddressInfo(name.get(), netaddr, false);
ended = true;
semait.signal();
return 0;
Expand All @@ -527,27 +526,33 @@ class GetAddrInfoThread : public Thread
class AddrInfoReaperThread : public Thread
{
public:
std::atomic<bool> stopped;
std::atomic<bool> stopped{false};
std::atomic<bool> running{false};
Semaphore sem;

AddrInfoReaperThread() : stopped(true)
AddrInfoReaperThread()
{
}

void stop()
{
stopped = true;
sem.signal();
join();
if (running)
{
CriticalBlock block(queryDNSCS);
gaPtrList.clear();
stopped = true;
sem.signal();
join();
running = false;
{
CriticalBlock block(queryDNSCS);
// TODO: should we wait a short while for any still running getaddrinfo threads to end ?
gaPtrList.clear();
}
}
}

virtual int run() override
{
stopped = false;
running = true;
while(!stopped)
{
{
Expand All @@ -560,9 +565,9 @@ class AddrInfoReaperThread : public Thread
while (iter != end)
{
GetAddrInfoThread *pItem = *iter;
if ( (pItem->thrdHasEnded()) && (pItem->join(20)) )
if ( (pItem->thrdHasEnded()) && (pItem->join(0)) )
{
// TODO: should we log failures ? (but not here inside CS)
// TODO: should we log lookup failures ? (but not here inside CS)
delete pItem;
iter = gaPtrList.erase(iter);
}
Expand All @@ -571,74 +576,32 @@ class AddrInfoReaperThread : public Thread
}
}
}
if (sem.wait(10))
if (sem.wait(500))
break;
}
return 0;
}

};

static std::atomic<UseUDE> disableDNSTimeout { UNINIT };

static Owned<AddrInfoReaperThread> addrinforeaperthrd;

static bool queryDNSTimeout()
static bool useDNSTimeout()
{
UseUDE state = disableDNSTimeout.load();
if (state == UNINIT)
queryTCPSettings();
if (!disableDNSTimeout)
{
CriticalBlock block(queryDNSCS);
state = disableDNSTimeout.load();
if (state == UNINIT)
if (!addrinforeaperthrd)
{
#ifdef _CONTAINERIZED
Owned<IPropertyTree> expert;
#else
Owned<IPropertyTree> envtree;
IPropertyTree *expert = nullptr;
#endif
try
{
#ifdef _CONTAINERIZED
expert.setown(getGlobalConfigSP()->getPropTree("expert"));
#else
// MCK - without this many components will not have a global prop
envtree.setown(getHPCCEnvironment());
if (envtree)
expert = envtree->queryPropTree("Software/Globals");
#endif
}
catch (IException *e)
{
e->Release();
}
catch (...)
{
}
state = ENABLED;
if (expert)
{
bool isDisabled = expert->getPropBool("@disableDNSTimeout", false);
if (isDisabled)
state = DISABLED;
}
disableDNSTimeout = state;

if (state == ENABLED)
CriticalBlock block(queryDNSCS);
if (!addrinforeaperthrd)
{
if (!addrinforeaperthrd)
{
addrinforeaperthrd.setown(new AddrInfoReaperThread());
addrinforeaperthrd->start(false);
}
addrinforeaperthrd.setown(new AddrInfoReaperThread());
addrinforeaperthrd->start(false);
}
}
}

if (state == ENABLED)
return true;

}
return false;
}

Expand Down Expand Up @@ -3601,7 +3564,6 @@ bool getAddressInfo(const char *name, unsigned *netaddr, bool okToLogErr)
static bool lookupHostAddress(const char *name, unsigned *netaddr, unsigned timeoutms=INFINITE)
{
// if IP4only or using MS V6 can only resolve IPv4 using
static bool recursioncheck = false; // needed to stop error message recursing
int retry=10;

#if defined(__linux__) || defined (__APPLE__) || defined(getaddrinfo)
Expand All @@ -3613,11 +3575,7 @@ static bool lookupHostAddress(const char *name, unsigned *netaddr, unsigned time
hostent * entry = gethostbyname(name);
while (entry==NULL) {
if (retry--==0) {
if (!recursioncheck) {
recursioncheck = true;
LogErr(h_errno,1,"gethostbyname failed",__LINE__,name);
recursioncheck = false;
}
RecursionSafeLogErr(h_errno, 1, "gethostbyname failed", __LINE__, name);
return false;
}
{
Expand Down Expand Up @@ -3652,7 +3610,7 @@ static bool lookupHostAddress(const char *name, unsigned *netaddr, unsigned time

#if defined(__linux__) || defined (__APPLE__) || defined(getaddrinfo)
bool ret = false;
if ( (timeoutms != INFINITE) && (queryDNSTimeout()) )
if ( (timeoutms != INFINITE) && (useDNSTimeout()) )
{
// NOTES:
// getaddrinfo_a() offers an async getaddrinfo method, but has some limitations and a possible mem leak
Expand All @@ -3664,7 +3622,7 @@ static bool lookupHostAddress(const char *name, unsigned *netaddr, unsigned time
try
{
getaddrthrd = new GetAddrInfoThread(name);
// NOTE: if at thread limit, start() might delay for several seconds, can we control this ?
// NOTE: if at os/system/shell thread limit, start() might take several seconds, can we control this ?
getaddrthrd->start(false);
}
catch (IException *e)
Expand Down

0 comments on commit b3a1885

Please sign in to comment.