Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.4.x' into candidate-…
Browse files Browse the repository at this point in the history
…9.6.x

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Mar 28, 2024
2 parents 0ffa389 + ca74c00 commit bc2cf78
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 19 deletions.
60 changes: 56 additions & 4 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,24 @@ class Url : public CInterface, implements IInterface
}
};

//if Url was globally accessible we could define this in jtrace instead
//http span standards documented here: https://opentelemetry.io/docs/specs/semconv/http/http-spans/
void setSpanURLAttributes(ISpan * clientSpan, const Url & url)
{
if (clientSpan == nullptr)
return;

clientSpan->setSpanAttribute("http.request.method", "POST"); //apparently hardcoded to post
//Even though a "service" and
//a "method" are tracked and sometimes
//the target service name is used
//and there's code and comments suggesting only
//GET is supported...
clientSpan->setSpanAttribute("network.peer.address", url.host.get());
clientSpan->setSpanAttribute("network.peer.port", url.port);
clientSpan->setSpanAttribute("network.protocol.name", url.method.get());
}

typedef IArrayOf<Url> UrlArray;

//=================================================================================================
Expand Down Expand Up @@ -963,12 +981,14 @@ class CWSCHelper : implements IWSCHelper, public CInterface
WSCType wscType;

public:
Owned<ISpan> activitySpanScope;
IMPLEMENT_IINTERFACE;

CWSCHelper(IWSCRowProvider *_rowProvider, IEngineRowAllocator * _outputAllocator, const char *_authToken, WSCMode _wscMode, ClientCertificate *_clientCert,
const IContextLogger &_logctx, IRoxieAbortMonitor *_roxieAbortMonitor, WSCType _wscType)
: logctx(_logctx), outputAllocator(_outputAllocator), clientCert(_clientCert), roxieAbortMonitor(_roxieAbortMonitor)
{
activitySpanScope.setown(logctx.queryActiveSpan()->createInternalSpan(_wscType == STsoap ? "SoapCall Activity": "HTTPCall Activity"));
wscMode = _wscMode;
wscType = _wscType;
done = 0;
Expand Down Expand Up @@ -1034,7 +1054,6 @@ class CWSCHelper : implements IWSCHelper, public CInterface
s.setown(helper->getXpathHintsXml());
xpathHints.setown(createPTreeFromXMLString(s.get()));
}

if (wscType == STsoap)
{
soapaction.set(s.setown(helper->getSoapAction()));
Expand Down Expand Up @@ -1972,7 +1991,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (!httpHeaderBlockContainsHeader(httpheaders, ACCEPT_ENCODING))
request.appendf("%s: gzip, deflate\r\n", ACCEPT_ENCODING);
#endif
Owned<IProperties> traceHeaders = master->logctx.getClientHeaders();
Owned<IProperties> traceHeaders = ::getClientHeaders(master->activitySpanScope);
if (traceHeaders)
{
Owned<IPropertyIterator> iter = traceHeaders->getIterator();
Expand Down Expand Up @@ -2457,6 +2476,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
checkTimeLimitExceeded(&remainingMS); // after ep.set which might make a potentially long getaddrinfo lookup ...
if (strieq(url.method, "https"))
proto = PersistentProtocol::ProtoTLS;

bool shouldClose = false;
Owned<ISocket> psock = master->usePersistConnections() ? persistentHandler->getAvailable(&ep, &shouldClose, proto) : nullptr;
if (psock)
Expand Down Expand Up @@ -2504,13 +2524,15 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
{
if (master->timeLimitExceeded)
{
master->activitySpanScope->recordError(SpanError("Time Limit Exceeded", e->errorCode(), true, true));
master->logctx.CTXLOG("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS);
processException(url, inputRows, e);
return;
}

if (e->errorCode() == ROXIE_ABORT_EVENT)
{
master->activitySpanScope->recordError(SpanError("Aborted", e->errorCode(), true, true));
StringBuffer s;
master->logctx.CTXLOG("%s exiting: Roxie Abort : %s", getWsCallTypeName(master->wscType),e->errorMessage(s).str());
throw;
Expand All @@ -2523,19 +2545,25 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
idx = 0;
if (idx==startidx)
{
master->activitySpanScope->recordException(e, true, true);
StringBuffer s;
master->logctx.CTXLOG("Exception %s", e->errorMessage(s).str());
processException(url, inputRows, e);
return;
}
} while (blacklist->blacklisted(url.port, url.host));

master->activitySpanScope->recordException(e, false, false); //Record the exception, but don't set failure
}
}
try
{
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
OwnedSpanScope socketOperationSpan = master->activitySpanScope->createClientSpan("Socket Write");
setSpanURLAttributes(socketOperationSpan, url);
socket->write(request.str(), request.length());

if (soapTraceLevel > 4)
master->logctx.CTXLOG("%s: sent request (%s) to %s:%d", getWsCallTypeName(master->wscType),master->service.str(), url.host.str(), url.port);
checkTimeLimitExceeded(&remainingMS);
Expand All @@ -2544,23 +2572,30 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
bool keepAlive2;
StringBuffer contentType;
int rval = readHttpResponse(response, socket, keepAlive2, contentType);
socketOperationSpan->setSpanAttribute("http.response.status_code", (int64_t)rval);
keepAlive = keepAlive && keepAlive2;

if (soapTraceLevel > 4)
master->logctx.CTXLOG("%s: received response (%s) from %s:%d", getWsCallTypeName(master->wscType),master->service.str(), url.host.str(), url.port);

if (rval != 200)
{
socketOperationSpan->setSpanStatusSuccess(false);
if (rval == 503)
{
socketOperationSpan->recordError(SpanError("Server Too Busy", 1001, true, true));
throw new ReceivedRoxieException(1001, "Server Too Busy");
}

StringBuffer text;
text.appendf("HTTP error (%d) in processQuery",rval);
rtlAddExceptionTag(text, "soapresponse", response.str());
socketOperationSpan->recordError(SpanError(text.str(), -1, true, true));
throw MakeStringExceptionDirect(-1, text.str());
}
if (response.length() == 0)
{
socketOperationSpan->recordError(SpanError("Zero length response in processQuery", -1, true, true));
throw MakeStringException(-1, "Zero length response in processQuery");
}
checkTimeLimitExceeded(&remainingMS);
Expand All @@ -2575,6 +2610,8 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
else if (keepAlive)
persistentHandler->add(socket, &ep, proto);
}

socketOperationSpan->setSpanStatusSuccess(true);
break;
}
catch (IReceivedRoxieException *e)
Expand Down Expand Up @@ -2606,6 +2643,8 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
processException(url, e->errorRow(), e);
else
processException(url, inputRows, e);

master->activitySpanScope->recordException(e, true, true);
break;
}
}
Expand All @@ -2616,14 +2655,17 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (master->timeLimitExceeded)
{
processException(url, inputRows, e);
master->logctx.CTXLOG("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS);
VStringBuffer msg("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS);
master->logctx.CTXLOG("%s", msg.str());
master->activitySpanScope->recordError(SpanError(msg.str(), e->errorCode(), true, true));
break;
}

if (e->errorCode() == ROXIE_ABORT_EVENT)
{
StringBuffer s;
master->logctx.CTXLOG("%s exiting: Roxie Abort : %s", getWsCallTypeName(master->wscType),e->errorMessage(s).str());
master->activitySpanScope->recordError(SpanError("Aborted", e->errorCode(), true, true));
throw;
}

Expand All @@ -2634,26 +2676,36 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (numRetries >= master->maxRetries)
{
// error affects all inputRows
master->logctx.CTXLOG("Exiting: maxRetries %d exceeded", master->maxRetries);
VStringBuffer msg("Exiting: maxRetries %d exceeded", master->maxRetries);
master->logctx.CTXLOG("%s", msg.str());
master->activitySpanScope->recordError(SpanError(msg.str(), e->errorCode(), true, true));
processException(url, inputRows, e);
break;
}
numRetries++;
master->logctx.CTXLOG("Retrying: attempt %d of %d", numRetries, master->maxRetries);
master->activitySpanScope->recordException(e, false, false);
e->Release();
}
catch (std::exception & es)
{
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
if(dynamic_cast<std::bad_alloc *>(&es))
{
master->activitySpanScope->recordError("std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
}

master->activitySpanScope->recordError(es.what());
throw MakeStringException(-1, "std::exception: standard library exception (%s) in CWSCAsyncFor processQuery",es.what());
}
catch (...)
{
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);

master->activitySpanScope->recordError(SpanError("Unknown exception in processQuery", -1, true, true));
throw MakeStringException(-1, "Unknown exception in processQuery");
}
}
Expand Down
14 changes: 14 additions & 0 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10525,6 +10525,18 @@ class CInitGroups
}
return true;
}
void clearLZGroups()
{
if (!writeLock)
throw makeStringException(0, "CInitGroups::clearLZGroups called in read-only mode");
IPropertyTree *root = groupsconnlock.conn->queryRoot();
std::vector<IPropertyTree *> toDelete;
Owned<IPropertyTreeIterator> groups = root->getElements("Group[@kind='dropzone']");
ForEach(*groups)
toDelete.push_back(&groups->query());
for (auto &group: toDelete)
root->removeTree(group);
}
void constructGroups(bool force, StringBuffer &messages, IPropertyTree *oldEnvironment)
{
Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software", myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
Expand Down Expand Up @@ -10717,12 +10729,14 @@ class CInitGroups
void initClusterGroups(bool force, StringBuffer &response, IPropertyTree *oldEnvironment, unsigned timems)
{
CInitGroups init(timems, true);
init.clearLZGroups(); // clear existing LZ groups, current ones will be recreated
init.constructGroups(force, response, oldEnvironment);
}

void initClusterAndStoragePlaneGroups(bool force, IPropertyTree *oldEnvironment, unsigned timems)
{
CInitGroups init(timems, true);
init.clearLZGroups(); // clear existing LZ groups, current ones will be recreated

StringBuffer response;
init.constructGroups(force, response, oldEnvironment);
Expand Down
4 changes: 4 additions & 0 deletions dali/ft/daftformat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,10 @@ void CRemotePartitioner::callRemote()

void CRemotePartitioner::getResults(PartitionPointArray & partition)
{
#ifdef RUN_SLAVES_ON_THREADS
join();
#endif

if (error)
throw error.getLink();

Expand Down
8 changes: 8 additions & 0 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2031,6 +2031,8 @@ void FileSprayer::gatherFileSizes(FilePartInfoArray & fileSizeQueue, bool errorI
if (alldone)
break;
}
for (idx = 0; idx < numThreads; idx++)
threads.item(idx).join();
for (idx = 0; idx < numThreads; idx++)
threads.item(idx).queryThrowError();
}
Expand Down Expand Up @@ -2564,6 +2566,12 @@ void FileSprayer::performTransfer()
numSlavesCompleted++;
}

#ifdef RUN_SLAVES_ON_THREADS
//Ensure that the transfer slave threads have terminated before continuing
ForEachItemIn(idx4, transferSlaves)
transferSlaves.item(idx4).join();
#endif

if (error)
throw LINK(error);

Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ void ContextLogger::exportStatsToSpan(bool failed, stat_type elapsedNs, unsigned
{
if (activeSpan->isRecording())
{
activeSpan->setSpanStatus(failed);
activeSpan->setSpanStatusSuccess(!failed);
setSpanAttribute("time_elapsed", elapsedNs);

if (memused)
Expand Down
11 changes: 11 additions & 0 deletions system/jhtree/ctfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,17 @@ void CJHLegacySearchNode::load(CKeyHdr *_keyHdr, const void *rawData, offset_t _
else {
keyBuf = NULL;
expandedSize = 0;
if (hdr.nodeType == NodeBranch)
{
if ((hdr.leftSib == 0) && (hdr.rightSib == 0))
{
//Sanity check to catch error where a section of the file has unexpectedly been zeroed.
//which is otherwise tricky to track down.
//This can only legally happen if there is an index with 0 entries
if (keyHdr->getNumRecords() != 0)
throw MakeStringException(0, "Zeroed index node detected at offset %llu", getFpos());
}
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions system/jlib/jtrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,11 +753,11 @@ class CSpan : public CInterfaceOf<ISpan>
return span ? span->IsRecording() : false;
}

virtual void setSpanStatus(bool spanFailed, const char * statusMessage)
virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage)
{
if (span != nullptr)
{
span->SetStatus(spanFailed ? opentelemetry::trace::StatusCode::kError : opentelemetry::trace::StatusCode::kOk, statusMessage);
span->SetStatus(spanSucceeded ? opentelemetry::trace::StatusCode::kOk : opentelemetry::trace::StatusCode::kError, statusMessage);
}
}

Expand Down Expand Up @@ -899,7 +899,7 @@ class CNullSpan final : public CInterfaceOf<ISpan>

virtual void recordException(IException * e, bool spanFailed, bool escapedScope) override {}
virtual void recordError(const SpanError & error) override {};
virtual void setSpanStatus(bool spanFailed, const char * statusMessage) override {}
virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage) override {}

virtual const char* queryGlobalId() const override { return nullptr; }
virtual const char* queryCallerId() const override { return nullptr; }
Expand Down
8 changes: 4 additions & 4 deletions system/jlib/jtrace.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ struct SpanError
: errorMessage(_errorMessage), errorCode(_errorCode), spanFailed(_spanFailed), escapeScope(_escapeScope) {}

/**
* @brief Sets the span status.
* @param _spanFailed Flag indicating whether the span failed.
* @brief Sets the span status success.
* @param _spanSucceeded Flag indicating whether the span succeeded.
* @param _spanScopeEscape Flag indicating whether the exception escaped the scope of the span.
*/
void setSpanStatus(bool _spanFailed, bool _spanScopeEscape) { spanFailed = _spanFailed; escapeScope = _spanScopeEscape;}
void setSpanStatusSuccess(bool _spanSucceeded, bool _spanScopeEscape) { spanFailed = !_spanSucceeded; escapeScope = _spanScopeEscape;}

/**
* @brief Sets the error message and error code.
Expand All @@ -123,7 +123,7 @@ interface ISpan : extends IInterface
virtual bool isRecording() const = 0; // Is it worth adding any events/attributes to this span?
virtual void recordException(IException * e, bool spanFailed = true, bool escapedScope = true) = 0;
virtual void recordError(const SpanError & error = SpanError()) = 0;
virtual void setSpanStatus(bool spanFailed, const char * statusMessage = NO_STATUS_MESSAGE) = 0;
virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage = NO_STATUS_MESSAGE) = 0;

virtual ISpan * createClientSpan(const char * name) = 0;
virtual ISpan * createInternalSpan(const char * name) = 0;
Expand Down
9 changes: 2 additions & 7 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,12 @@ class JlibTraceTest : public CppUnit::TestFixture

{
OwnedSpanScope serverSpan = queryTraceManager().createServerSpan("failedErrorSpanEscaped", emptyMockHTTPHeaders);
SpanError * error = new SpanError("hello");
error->setSpanStatus(true, true);
serverSpan->recordError(*error);
serverSpan->recordError(SpanError("hello", -1, true, true)); //error message hello, no error code, error caused failure, and error caused escape
}//{ "type": "span", "name": "failedErrorSpanEscaped", "trace_id": "634f386c18a6140544c980e0d5a15905", "span_id": "e2f59c48f63a8f82", "start": 1709675508231168974, "duration": 7731717678, "status": "Error", "kind": "Server", "description": "hello", "instrumented_library": "unittests", "events":[ { "name": "Exception", "time_stamp": 1709675512164430668, "attributes": {"escaped": 1,"message": "hello" } } ] }

{
OwnedSpanScope serverSpan = queryTraceManager().createServerSpan("failedErrEscapedMsgErrCode", emptyMockHTTPHeaders);
SpanError * error = new SpanError();
error->setSpanStatus(true, true);
error->setError("hello", 34);
serverSpan->recordError(*error);
serverSpan->recordError(SpanError("hello", 34, true, true)); //error message hello, error code 34, error caused failure, and error caused escape
}//failedErrEscapedMsgErrCode

{
Expand Down

0 comments on commit bc2cf78

Please sign in to comment.