Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.6.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Mar 28, 2024
2 parents c14483b + bc2cf78 commit 839841f
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 21 deletions.
60 changes: 56 additions & 4 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,24 @@ class Url : public CInterface, implements IInterface
}
};

//if Url was globally accessible we could define this in jtrace instead
//http span standards documented here: https://opentelemetry.io/docs/specs/semconv/http/http-spans/
void setSpanURLAttributes(ISpan * clientSpan, const Url & url)
{
if (clientSpan == nullptr)
return;

clientSpan->setSpanAttribute("http.request.method", "POST"); //apparently hardcoded to post
//Even though a "service" and
//a "method" are tracked and sometimes
//the target service name is used
//and there's code and comments suggesting only
//GET is supported...
clientSpan->setSpanAttribute("network.peer.address", url.host.get());
clientSpan->setSpanAttribute("network.peer.port", url.port);
clientSpan->setSpanAttribute("network.protocol.name", url.method.get());
}

typedef IArrayOf<Url> UrlArray;

//=================================================================================================
Expand Down Expand Up @@ -963,12 +981,14 @@ class CWSCHelper : implements IWSCHelper, public CInterface
WSCType wscType;

public:
Owned<ISpan> activitySpanScope;
IMPLEMENT_IINTERFACE;

CWSCHelper(IWSCRowProvider *_rowProvider, IEngineRowAllocator * _outputAllocator, const char *_authToken, WSCMode _wscMode, ClientCertificate *_clientCert,
const IContextLogger &_logctx, IRoxieAbortMonitor *_roxieAbortMonitor, WSCType _wscType)
: logctx(_logctx), outputAllocator(_outputAllocator), clientCert(_clientCert), roxieAbortMonitor(_roxieAbortMonitor)
{
activitySpanScope.setown(logctx.queryActiveSpan()->createInternalSpan(_wscType == STsoap ? "SoapCall Activity": "HTTPCall Activity"));
wscMode = _wscMode;
wscType = _wscType;
done = 0;
Expand Down Expand Up @@ -1034,7 +1054,6 @@ class CWSCHelper : implements IWSCHelper, public CInterface
s.setown(helper->getXpathHintsXml());
xpathHints.setown(createPTreeFromXMLString(s.get()));
}

if (wscType == STsoap)
{
soapaction.set(s.setown(helper->getSoapAction()));
Expand Down Expand Up @@ -1972,7 +1991,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (!httpHeaderBlockContainsHeader(httpheaders, ACCEPT_ENCODING))
request.appendf("%s: gzip, deflate\r\n", ACCEPT_ENCODING);
#endif
Owned<IProperties> traceHeaders = master->logctx.getClientHeaders();
Owned<IProperties> traceHeaders = ::getClientHeaders(master->activitySpanScope);
if (traceHeaders)
{
Owned<IPropertyIterator> iter = traceHeaders->getIterator();
Expand Down Expand Up @@ -2457,6 +2476,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
checkTimeLimitExceeded(&remainingMS); // after ep.set which might make a potentially long getaddrinfo lookup ...
if (strieq(url.method, "https"))
proto = PersistentProtocol::ProtoTLS;

bool shouldClose = false;
Owned<ISocket> psock = master->usePersistConnections() ? persistentHandler->getAvailable(&ep, &shouldClose, proto) : nullptr;
if (psock)
Expand Down Expand Up @@ -2504,13 +2524,15 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
{
if (master->timeLimitExceeded)
{
master->activitySpanScope->recordError(SpanError("Time Limit Exceeded", e->errorCode(), true, true));
master->logctx.CTXLOG("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS);
processException(url, inputRows, e);
return;
}

if (e->errorCode() == ROXIE_ABORT_EVENT)
{
master->activitySpanScope->recordError(SpanError("Aborted", e->errorCode(), true, true));
StringBuffer s;
master->logctx.CTXLOG("%s exiting: Roxie Abort : %s", getWsCallTypeName(master->wscType),e->errorMessage(s).str());
throw;
Expand All @@ -2523,19 +2545,25 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
idx = 0;
if (idx==startidx)
{
master->activitySpanScope->recordException(e, true, true);
StringBuffer s;
master->logctx.CTXLOG("Exception %s", e->errorMessage(s).str());
processException(url, inputRows, e);
return;
}
} while (blacklist->blacklisted(url.port, url.host));

master->activitySpanScope->recordException(e, false, false); //Record the exception, but don't set failure
}
}
try
{
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
OwnedSpanScope socketOperationSpan = master->activitySpanScope->createClientSpan("Socket Write");
setSpanURLAttributes(socketOperationSpan, url);
socket->write(request.str(), request.length());

if (soapTraceLevel > 4)
master->logctx.CTXLOG("%s: sent request (%s) to %s:%d", getWsCallTypeName(master->wscType),master->service.str(), url.host.str(), url.port);
checkTimeLimitExceeded(&remainingMS);
Expand All @@ -2544,23 +2572,30 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
bool keepAlive2;
StringBuffer contentType;
int rval = readHttpResponse(response, socket, keepAlive2, contentType);
socketOperationSpan->setSpanAttribute("http.response.status_code", (int64_t)rval);
keepAlive = keepAlive && keepAlive2;

if (soapTraceLevel > 4)
master->logctx.CTXLOG("%s: received response (%s) from %s:%d", getWsCallTypeName(master->wscType),master->service.str(), url.host.str(), url.port);

if (rval != 200)
{
socketOperationSpan->setSpanStatusSuccess(false);
if (rval == 503)
{
socketOperationSpan->recordError(SpanError("Server Too Busy", 1001, true, true));
throw new ReceivedRoxieException(1001, "Server Too Busy");
}

StringBuffer text;
text.appendf("HTTP error (%d) in processQuery",rval);
rtlAddExceptionTag(text, "soapresponse", response.str());
socketOperationSpan->recordError(SpanError(text.str(), -1, true, true));
throw MakeStringExceptionDirect(-1, text.str());
}
if (response.length() == 0)
{
socketOperationSpan->recordError(SpanError("Zero length response in processQuery", -1, true, true));
throw MakeStringException(-1, "Zero length response in processQuery");
}
checkTimeLimitExceeded(&remainingMS);
Expand All @@ -2575,6 +2610,8 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
else if (keepAlive)
persistentHandler->add(socket, &ep, proto);
}

socketOperationSpan->setSpanStatusSuccess(true);
break;
}
catch (IReceivedRoxieException *e)
Expand Down Expand Up @@ -2606,6 +2643,8 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
processException(url, e->errorRow(), e);
else
processException(url, inputRows, e);

master->activitySpanScope->recordException(e, true, true);
break;
}
}
Expand All @@ -2616,14 +2655,17 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (master->timeLimitExceeded)
{
processException(url, inputRows, e);
master->logctx.CTXLOG("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS);
VStringBuffer msg("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS);
master->logctx.CTXLOG("%s", msg.str());
master->activitySpanScope->recordError(SpanError(msg.str(), e->errorCode(), true, true));
break;
}

if (e->errorCode() == ROXIE_ABORT_EVENT)
{
StringBuffer s;
master->logctx.CTXLOG("%s exiting: Roxie Abort : %s", getWsCallTypeName(master->wscType),e->errorMessage(s).str());
master->activitySpanScope->recordError(SpanError("Aborted", e->errorCode(), true, true));
throw;
}

Expand All @@ -2634,26 +2676,36 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (numRetries >= master->maxRetries)
{
// error affects all inputRows
master->logctx.CTXLOG("Exiting: maxRetries %d exceeded", master->maxRetries);
VStringBuffer msg("Exiting: maxRetries %d exceeded", master->maxRetries);
master->logctx.CTXLOG("%s", msg.str());
master->activitySpanScope->recordError(SpanError(msg.str(), e->errorCode(), true, true));
processException(url, inputRows, e);
break;
}
numRetries++;
master->logctx.CTXLOG("Retrying: attempt %d of %d", numRetries, master->maxRetries);
master->activitySpanScope->recordException(e, false, false);
e->Release();
}
catch (std::exception & es)
{
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
if(dynamic_cast<std::bad_alloc *>(&es))
{
master->activitySpanScope->recordError("std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
}

master->activitySpanScope->recordError(es.what());
throw MakeStringException(-1, "std::exception: standard library exception (%s) in CWSCAsyncFor processQuery",es.what());
}
catch (...)
{
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);

master->activitySpanScope->recordError(SpanError("Unknown exception in processQuery", -1, true, true));
throw MakeStringException(-1, "Unknown exception in processQuery");
}
}
Expand Down
23 changes: 23 additions & 0 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4846,6 +4846,7 @@ class CLocalWUException : implements IWUException, public CInterface
virtual unsigned getSequence() const override;
virtual const char * queryScope() const override;
virtual unsigned getPriority() const override;
virtual double getCost() const override;
virtual void setExceptionSource(const char *str) override;
virtual void setExceptionMessage(const char *str) override;
virtual void setExceptionCode(unsigned code) override;
Expand All @@ -4857,6 +4858,7 @@ class CLocalWUException : implements IWUException, public CInterface
virtual void setActivityId(unsigned _id) override;
virtual void setScope(const char * _scope) override;
virtual void setPriority(unsigned _priority) override;
virtual void setCost(double cost) override;
};

//==========================================================================================
Expand Down Expand Up @@ -7629,6 +7631,18 @@ WUPriorityClass CLocalWorkUnit::getPriority() const
return (WUPriorityClass) getEnum(p, "@priorityClass", priorityClasses);
}

void CLocalWorkUnit::setCost(double cost)
{
CriticalBlock block(crit);
p->setPropReal("@cost", cost);
}

double CLocalWorkUnit::getCost() const
{
CriticalBlock block(crit);
return p->getPropReal("@cost", 0);
}

const char *CLocalWorkUnit::queryPriorityDesc() const
{
return getEnumText(getPriority(), priorityClasses);
Expand Down Expand Up @@ -11926,6 +11940,11 @@ unsigned CLocalWUException::getPriority() const
return p->getPropInt("@prio", 0);
}

double CLocalWUException::getCost() const
{
return p->getPropReal("@cost", 0);
}

void CLocalWUException::setExceptionSource(const char *str)
{
p->setProp("@source", str);
Expand Down Expand Up @@ -11981,6 +12000,10 @@ void CLocalWUException::setPriority(unsigned _priority)
p->setPropInt("@prio", _priority);
}

void CLocalWUException::setCost(double _cost)
{
p->setPropReal("@cost", _cost);
}
//==========================================================================================

CLocalWUAppValue::CLocalWUAppValue(const IPropertyTree *_owner, const IPropertyTree *_props) : owner(_owner), props(_props)
Expand Down
2 changes: 2 additions & 0 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ interface IConstWUException : extends IInterface
virtual unsigned getActivityId() const = 0;
virtual const char * queryScope() const = 0;
virtual unsigned getPriority() const = 0; // For ordering within a severity - e.g. warnings about inefficiency
virtual double getCost() const = 0; // cost optimizer cost saving estimate.
};


Expand All @@ -536,6 +537,7 @@ interface IWUException : extends IConstWUException
virtual void setActivityId(unsigned _id) = 0;
virtual void setScope(const char * _scope) = 0;
virtual void setPriority(unsigned _priority) = 0;
virtual void setCost(double cost) = 0; // cost optimizer cost saving estimate.
};


Expand Down
2 changes: 2 additions & 0 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ public:
virtual IConstWUPluginIterator & getPlugins() const;
virtual IConstWULibraryIterator & getLibraries() const;
virtual WUPriorityClass getPriority() const;
virtual double getCost() const;
virtual const char *queryPriorityDesc() const;
virtual int getPriorityLevel() const;
virtual int getPriorityValue() const;
Expand Down Expand Up @@ -328,6 +329,7 @@ public:
void setDebugValueInt(const char * propname, int value, bool overwrite);
void setJobName(const char * value);
void setPriority(WUPriorityClass cls);
void setCost(double cost);
void setPriorityLevel(int level);
void setRescheduleFlag(bool value);
void setResultLimit(unsigned value);
Expand Down
4 changes: 2 additions & 2 deletions common/wuanalysis/anacommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ void PerformanceIssue::createException(IWorkUnit * wu, double costRate)
we->setExceptionFileName(filename);
StringBuffer s(comment); // Append scope to comment as scope column is not visible in ECLWatch
s.appendf(" (%s)", scope.str());
we->setExceptionMessage(s.str());
if (costRate!=0.0)
{
double timePenaltyPerHour = (double)statUnits2seconds(timePenalty) / 3600;
s.appendf(" cost %.2f", timePenaltyPerHour*costRate);
we->setCost(timePenaltyPerHour*costRate);
}
we->setExceptionMessage(s.str());
we->setExceptionSource(CostOptimizerName);
}

Expand Down
14 changes: 14 additions & 0 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10525,6 +10525,18 @@ class CInitGroups
}
return true;
}
void clearLZGroups()
{
if (!writeLock)
throw makeStringException(0, "CInitGroups::clearLZGroups called in read-only mode");
IPropertyTree *root = groupsconnlock.conn->queryRoot();
std::vector<IPropertyTree *> toDelete;
Owned<IPropertyTreeIterator> groups = root->getElements("Group[@kind='dropzone']");
ForEach(*groups)
toDelete.push_back(&groups->query());
for (auto &group: toDelete)
root->removeTree(group);
}
void constructGroups(bool force, StringBuffer &messages, IPropertyTree *oldEnvironment)
{
Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software", myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
Expand Down Expand Up @@ -10717,12 +10729,14 @@ class CInitGroups
void initClusterGroups(bool force, StringBuffer &response, IPropertyTree *oldEnvironment, unsigned timems)
{
CInitGroups init(timems, true);
init.clearLZGroups(); // clear existing LZ groups, current ones will be recreated
init.constructGroups(force, response, oldEnvironment);
}

void initClusterAndStoragePlaneGroups(bool force, IPropertyTree *oldEnvironment, unsigned timems)
{
CInitGroups init(timems, true);
init.clearLZGroups(); // clear existing LZ groups, current ones will be recreated

StringBuffer response;
init.constructGroups(force, response, oldEnvironment);
Expand Down
4 changes: 4 additions & 0 deletions dali/ft/daftformat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,10 @@ void CRemotePartitioner::callRemote()

void CRemotePartitioner::getResults(PartitionPointArray & partition)
{
#ifdef RUN_SLAVES_ON_THREADS
join();
#endif

if (error)
throw error.getLink();

Expand Down
8 changes: 8 additions & 0 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2031,6 +2031,8 @@ void FileSprayer::gatherFileSizes(FilePartInfoArray & fileSizeQueue, bool errorI
if (alldone)
break;
}
for (idx = 0; idx < numThreads; idx++)
threads.item(idx).join();
for (idx = 0; idx < numThreads; idx++)
threads.item(idx).queryThrowError();
}
Expand Down Expand Up @@ -2564,6 +2566,12 @@ void FileSprayer::performTransfer()
numSlavesCompleted++;
}

#ifdef RUN_SLAVES_ON_THREADS
//Ensure that the transfer slave threads have terminated before continuing
ForEachItemIn(idx4, transferSlaves)
transferSlaves.item(idx4).join();
#endif

if (error)
throw LINK(error);

Expand Down
1 change: 1 addition & 0 deletions esp/scm/ws_workunits_struct.ecm
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ ESPStruct [nil_remove] ECLException
[min_ver("1.63")] int Activity;
[min_ver("1.69")] string Scope;
[min_ver("1.69")] int Priority;
[min_ver("1.70")] double Cost;
};
// ===========================================================================
ESPStruct [nil_remove] ECLSchemaItem
Expand Down
Loading

0 comments on commit 839841f

Please sign in to comment.