From 839bf57f9e4b3b062d9d061f78f7405a023b16b5 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Thu, 7 Mar 2024 16:02:03 +0000 Subject: [PATCH 1/4] HPCC-31420 Ensure threads are joined inside the dfu copy code Signed-off-by: Gavin Halliday --- dali/ft/daftformat.cpp | 4 ++++ dali/ft/filecopy.cpp | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/dali/ft/daftformat.cpp b/dali/ft/daftformat.cpp index 02040383302..cc8f74301f8 100644 --- a/dali/ft/daftformat.cpp +++ b/dali/ft/daftformat.cpp @@ -1974,6 +1974,10 @@ void CRemotePartitioner::callRemote() void CRemotePartitioner::getResults(PartitionPointArray & partition) { +#ifdef RUN_SLAVES_ON_THREADS + join(); +#endif + if (error) throw error.getLink(); diff --git a/dali/ft/filecopy.cpp b/dali/ft/filecopy.cpp index d93ade88777..c6149a23d68 100644 --- a/dali/ft/filecopy.cpp +++ b/dali/ft/filecopy.cpp @@ -2033,6 +2033,8 @@ void FileSprayer::gatherFileSizes(FilePartInfoArray & fileSizeQueue, bool errorI if (alldone) break; } + for (idx = 0; idx < numThreads; idx++) + threads.item(idx).join(); for (idx = 0; idx < numThreads; idx++) threads.item(idx).queryThrowError(); } @@ -2566,6 +2568,12 @@ void FileSprayer::performTransfer() numSlavesCompleted++; } +#ifdef RUN_SLAVES_ON_THREADS + //Ensure that the transfer slave threads have terminated before continuing + ForEachItemIn(idx4, transferSlaves) + transferSlaves.item(idx4).join(); +#endif + if (error) throw LINK(error); From c9d1604a02ee16c2b9c24c3c8567129dbbb3a96a Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Tue, 26 Mar 2024 15:53:07 +0000 Subject: [PATCH 2/4] HPCC-31525 Report a more specific error if part of an index file has been zeroed Signed-off-by: Gavin Halliday --- system/jhtree/ctfile.cpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/system/jhtree/ctfile.cpp b/system/jhtree/ctfile.cpp index 132145626a5..949b6028bff 100644 --- a/system/jhtree/ctfile.cpp +++ b/system/jhtree/ctfile.cpp @@ -965,6 +965,17 @@ void CJHLegacySearchNode::load(CKeyHdr *_keyHdr, const void *rawData, offset_t _ else { keyBuf = NULL; expandedSize = 0; + if (hdr.nodeType == NodeBranch) + { + if ((hdr.leftSib == 0) && (hdr.rightSib == 0)) + { + //Sanity check to catch error where a section of the file has unexpectedly been zeroed. + //which is otherwise tricky to track down. + //This can only legally happen if there is an index with 0 entries + if (keyHdr->getNumRecords() != 0) + throw MakeStringException(0, "Zeroed index node detected at offset %llu", getFpos()); + } + } } } } From 00e8c3458844930308605dbb6cace98d5d36167b Mon Sep 17 00:00:00 2001 From: Rodrigo Pastrana Date: Fri, 23 Feb 2024 23:39:20 -0500 Subject: [PATCH 3/4] HPCC-31370 SoapCall Instrumentation code review - Remove unnecessary commented out code - Rename setSpanStatus param to spanSucceeded - Adds escapeScope param - Adds method to setSpanURL attributes - Creates new span wrapping socket operations - Creates soapcall activity level span Signed-off-by: Rodrigo Pastrana --- common/thorhelper/thorsoapcall.cpp | 60 ++++++++++++++++++++++++++++-- roxie/ccd/ccdlistener.cpp | 2 +- system/jlib/jtrace.cpp | 6 +-- system/jlib/jtrace.hpp | 8 ++-- testing/unittests/jlibtests.cpp | 9 +---- 5 files changed, 66 insertions(+), 19 deletions(-) diff --git a/common/thorhelper/thorsoapcall.cpp b/common/thorhelper/thorsoapcall.cpp index 903bbb52909..0af7bf97866 100644 --- a/common/thorhelper/thorsoapcall.cpp +++ b/common/thorhelper/thorsoapcall.cpp @@ -254,6 +254,24 @@ class Url : public CInterface, implements IInterface } }; +//if Url was globally accessible we could define this in jtrace instead +//http span standards documented here: https://opentelemetry.io/docs/specs/semconv/http/http-spans/ +void setSpanURLAttributes(ISpan * clientSpan, const Url & url) +{ + if (clientSpan == nullptr) + return; + + clientSpan->setSpanAttribute("http.request.method", "POST"); //apparently hardcoded to post + //Even though a "service" and + //a "method" are tracked and sometimes + //the target service name is used + //and there's code and comments suggesting only + //GET is supported... + clientSpan->setSpanAttribute("network.peer.address", url.host.get()); + clientSpan->setSpanAttribute("network.peer.port", url.port); + clientSpan->setSpanAttribute("network.protocol.name", url.method.get()); +} + typedef IArrayOf UrlArray; //================================================================================================= @@ -963,12 +981,14 @@ class CWSCHelper : implements IWSCHelper, public CInterface WSCType wscType; public: + Owned activitySpanScope; IMPLEMENT_IINTERFACE; CWSCHelper(IWSCRowProvider *_rowProvider, IEngineRowAllocator * _outputAllocator, const char *_authToken, WSCMode _wscMode, ClientCertificate *_clientCert, const IContextLogger &_logctx, IRoxieAbortMonitor *_roxieAbortMonitor, WSCType _wscType) : logctx(_logctx), outputAllocator(_outputAllocator), clientCert(_clientCert), roxieAbortMonitor(_roxieAbortMonitor) { + activitySpanScope.setown(logctx.queryActiveSpan()->createInternalSpan(_wscType == STsoap ? "SoapCall Activity": "HTTPCall Activity")); wscMode = _wscMode; wscType = _wscType; done = 0; @@ -1034,7 +1054,6 @@ class CWSCHelper : implements IWSCHelper, public CInterface s.setown(helper->getXpathHintsXml()); xpathHints.setown(createPTreeFromXMLString(s.get())); } - if (wscType == STsoap) { soapaction.set(s.setown(helper->getSoapAction())); @@ -1972,7 +1991,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo if (!httpHeaderBlockContainsHeader(httpheaders, ACCEPT_ENCODING)) request.appendf("%s: gzip, deflate\r\n", ACCEPT_ENCODING); #endif - Owned traceHeaders = master->logctx.getClientHeaders(); + Owned traceHeaders = ::getClientHeaders(master->activitySpanScope); if (traceHeaders) { Owned iter = traceHeaders->getIterator(); @@ -2457,6 +2476,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo checkTimeLimitExceeded(&remainingMS); // after ep.set which might make a potentially long getaddrinfo lookup ... if (strieq(url.method, "https")) proto = PersistentProtocol::ProtoTLS; + bool shouldClose = false; Owned psock = master->usePersistConnections() ? persistentHandler->getAvailable(&ep, &shouldClose, proto) : nullptr; if (psock) @@ -2504,6 +2524,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo { if (master->timeLimitExceeded) { + master->activitySpanScope->recordError(SpanError("Time Limit Exceeded", e->errorCode(), true, true)); master->logctx.CTXLOG("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS); processException(url, inputRows, e); return; @@ -2511,6 +2532,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo if (e->errorCode() == ROXIE_ABORT_EVENT) { + master->activitySpanScope->recordError(SpanError("Aborted", e->errorCode(), true, true)); StringBuffer s; master->logctx.CTXLOG("%s exiting: Roxie Abort : %s", getWsCallTypeName(master->wscType),e->errorMessage(s).str()); throw; @@ -2523,19 +2545,25 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo idx = 0; if (idx==startidx) { + master->activitySpanScope->recordException(e, true, true); StringBuffer s; master->logctx.CTXLOG("Exception %s", e->errorMessage(s).str()); processException(url, inputRows, e); return; } } while (blacklist->blacklisted(url.port, url.host)); + + master->activitySpanScope->recordException(e, false, false); //Record the exception, but don't set failure } } try { checkTimeLimitExceeded(&remainingMS); checkRoxieAbortMonitor(master->roxieAbortMonitor); + OwnedSpanScope socketOperationSpan = master->activitySpanScope->createClientSpan("Socket Write"); + setSpanURLAttributes(socketOperationSpan, url); socket->write(request.str(), request.length()); + if (soapTraceLevel > 4) master->logctx.CTXLOG("%s: sent request (%s) to %s:%d", getWsCallTypeName(master->wscType),master->service.str(), url.host.str(), url.port); checkTimeLimitExceeded(&remainingMS); @@ -2544,6 +2572,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo bool keepAlive2; StringBuffer contentType; int rval = readHttpResponse(response, socket, keepAlive2, contentType); + socketOperationSpan->setSpanAttribute("http.response.status_code", (int64_t)rval); keepAlive = keepAlive && keepAlive2; if (soapTraceLevel > 4) @@ -2551,16 +2580,22 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo if (rval != 200) { + socketOperationSpan->setSpanStatusSuccess(false); if (rval == 503) + { + socketOperationSpan->recordError(SpanError("Server Too Busy", 1001, true, true)); throw new ReceivedRoxieException(1001, "Server Too Busy"); + } StringBuffer text; text.appendf("HTTP error (%d) in processQuery",rval); rtlAddExceptionTag(text, "soapresponse", response.str()); + socketOperationSpan->recordError(SpanError(text.str(), -1, true, true)); throw MakeStringExceptionDirect(-1, text.str()); } if (response.length() == 0) { + socketOperationSpan->recordError(SpanError("Zero length response in processQuery", -1, true, true)); throw MakeStringException(-1, "Zero length response in processQuery"); } checkTimeLimitExceeded(&remainingMS); @@ -2575,6 +2610,8 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo else if (keepAlive) persistentHandler->add(socket, &ep, proto); } + + socketOperationSpan->setSpanStatusSuccess(true); break; } catch (IReceivedRoxieException *e) @@ -2606,6 +2643,8 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo processException(url, e->errorRow(), e); else processException(url, inputRows, e); + + master->activitySpanScope->recordException(e, true, true); break; } } @@ -2616,7 +2655,9 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo if (master->timeLimitExceeded) { processException(url, inputRows, e); - master->logctx.CTXLOG("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS); + VStringBuffer msg("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS); + master->logctx.CTXLOG("%s", msg.str()); + master->activitySpanScope->recordError(SpanError(msg.str(), e->errorCode(), true, true)); break; } @@ -2624,6 +2665,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo { StringBuffer s; master->logctx.CTXLOG("%s exiting: Roxie Abort : %s", getWsCallTypeName(master->wscType),e->errorMessage(s).str()); + master->activitySpanScope->recordError(SpanError("Aborted", e->errorCode(), true, true)); throw; } @@ -2634,12 +2676,15 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo if (numRetries >= master->maxRetries) { // error affects all inputRows - master->logctx.CTXLOG("Exiting: maxRetries %d exceeded", master->maxRetries); + VStringBuffer msg("Exiting: maxRetries %d exceeded", master->maxRetries); + master->logctx.CTXLOG("%s", msg.str()); + master->activitySpanScope->recordError(SpanError(msg.str(), e->errorCode(), true, true)); processException(url, inputRows, e); break; } numRetries++; master->logctx.CTXLOG("Retrying: attempt %d of %d", numRetries, master->maxRetries); + master->activitySpanScope->recordException(e, false, false); e->Release(); } catch (std::exception & es) @@ -2647,13 +2692,20 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); if(dynamic_cast(&es)) + { + master->activitySpanScope->recordError("std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery"); throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery"); + } + + master->activitySpanScope->recordError(es.what()); throw MakeStringException(-1, "std::exception: standard library exception (%s) in CWSCAsyncFor processQuery",es.what()); } catch (...) { if (master->usePersistConnections() && isReused) persistentHandler->doneUsing(socket, false); + + master->activitySpanScope->recordError(SpanError("Unknown exception in processQuery", -1, true, true)); throw MakeStringException(-1, "Unknown exception in processQuery"); } } diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index 370a83bcdc7..6e319d2ae67 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -949,7 +949,7 @@ void ContextLogger::exportStatsToSpan(bool failed, stat_type elapsedNs, unsigned { if (activeSpan->isRecording()) { - activeSpan->setSpanStatus(failed); + activeSpan->setSpanStatusSuccess(!failed); setSpanAttribute("time_elapsed", elapsedNs); if (memused) diff --git a/system/jlib/jtrace.cpp b/system/jlib/jtrace.cpp index feabb56304e..8c18061fa4b 100644 --- a/system/jlib/jtrace.cpp +++ b/system/jlib/jtrace.cpp @@ -748,11 +748,11 @@ class CSpan : public CInterfaceOf return span ? span->IsRecording() : false; } - virtual void setSpanStatus(bool spanFailed, const char * statusMessage) + virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage) { if (span != nullptr) { - span->SetStatus(spanFailed ? opentelemetry::trace::StatusCode::kError : opentelemetry::trace::StatusCode::kOk, statusMessage); + span->SetStatus(spanSucceeded ? opentelemetry::trace::StatusCode::kOk : opentelemetry::trace::StatusCode::kError, statusMessage); } } @@ -894,7 +894,7 @@ class CNullSpan final : public CInterfaceOf virtual void recordException(IException * e, bool spanFailed, bool escapedScope) override {} virtual void recordError(const SpanError & error) override {}; - virtual void setSpanStatus(bool spanFailed, const char * statusMessage) override {} + virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage) override {} virtual const char* queryGlobalId() const override { return nullptr; } virtual const char* queryCallerId() const override { return nullptr; } diff --git a/system/jlib/jtrace.hpp b/system/jlib/jtrace.hpp index 31a957e289c..ba393c2c0c1 100644 --- a/system/jlib/jtrace.hpp +++ b/system/jlib/jtrace.hpp @@ -94,11 +94,11 @@ struct SpanError : errorMessage(_errorMessage), errorCode(_errorCode), spanFailed(_spanFailed), escapeScope(_escapeScope) {} /** - * @brief Sets the span status. - * @param _spanFailed Flag indicating whether the span failed. + * @brief Sets the span status success. + * @param _spanSucceeded Flag indicating whether the span succeeded. * @param _spanScopeEscape Flag indicating whether the exception escaped the scope of the span. */ - void setSpanStatus(bool _spanFailed, bool _spanScopeEscape) { spanFailed = _spanFailed; escapeScope = _spanScopeEscape;} + void setSpanStatusSuccess(bool _spanSucceeded, bool _spanScopeEscape) { spanFailed = !_spanSucceeded; escapeScope = _spanScopeEscape;} /** * @brief Sets the error message and error code. @@ -123,7 +123,7 @@ interface ISpan : extends IInterface virtual bool isRecording() const = 0; // Is it worth adding any events/attributes to this span? virtual void recordException(IException * e, bool spanFailed = true, bool escapedScope = true) = 0; virtual void recordError(const SpanError & error = SpanError()) = 0; - virtual void setSpanStatus(bool spanFailed, const char * statusMessage = NO_STATUS_MESSAGE) = 0; + virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage = NO_STATUS_MESSAGE) = 0; virtual ISpan * createClientSpan(const char * name) = 0; virtual ISpan * createInternalSpan(const char * name) = 0; diff --git a/testing/unittests/jlibtests.cpp b/testing/unittests/jlibtests.cpp index e8001c068ed..4ea638fad96 100644 --- a/testing/unittests/jlibtests.cpp +++ b/testing/unittests/jlibtests.cpp @@ -288,17 +288,12 @@ class JlibTraceTest : public CppUnit::TestFixture { OwnedSpanScope serverSpan = queryTraceManager().createServerSpan("failedErrorSpanEscaped", emptyMockHTTPHeaders); - SpanError * error = new SpanError("hello"); - error->setSpanStatus(true, true); - serverSpan->recordError(*error); + serverSpan->recordError(SpanError("hello", -1, true, true)); //error message hello, no error code, error caused failure, and error caused escape }//{ "type": "span", "name": "failedErrorSpanEscaped", "trace_id": "634f386c18a6140544c980e0d5a15905", "span_id": "e2f59c48f63a8f82", "start": 1709675508231168974, "duration": 7731717678, "status": "Error", "kind": "Server", "description": "hello", "instrumented_library": "unittests", "events":[ { "name": "Exception", "time_stamp": 1709675512164430668, "attributes": {"escaped": 1,"message": "hello" } } ] } { OwnedSpanScope serverSpan = queryTraceManager().createServerSpan("failedErrEscapedMsgErrCode", emptyMockHTTPHeaders); - SpanError * error = new SpanError(); - error->setSpanStatus(true, true); - error->setError("hello", 34); - serverSpan->recordError(*error); + serverSpan->recordError(SpanError("hello", 34, true, true)); //error message hello, error code 34, error caused failure, and error caused escape }//failedErrEscapedMsgErrCode { From 8ce96c9158e2ccce5177667d19d105046ae03296 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Wed, 27 Mar 2024 12:32:04 +0000 Subject: [PATCH 4/4] HPCC-31530 Remove stale LZ groups Remove existing Dali LZ groups before creating groups from the environment or storage planes. This avoids stale groups being left behind, which in-turn can cause systems to spuriously see them and try to reference nodes that no longer exist, causing stalls and delays to be introduced. Signed-off-by: Jake Smith --- dali/base/dadfs.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 7efba0a945f..81202ff10e1 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -10525,6 +10525,18 @@ class CInitGroups } return true; } + void clearLZGroups() + { + if (!writeLock) + throw makeStringException(0, "CInitGroups::clearLZGroups called in read-only mode"); + IPropertyTree *root = groupsconnlock.conn->queryRoot(); + std::vector toDelete; + Owned groups = root->getElements("Group[@kind='dropzone']"); + ForEach(*groups) + toDelete.push_back(&groups->query()); + for (auto &group: toDelete) + root->removeTree(group); + } void constructGroups(bool force, StringBuffer &messages, IPropertyTree *oldEnvironment) { Owned conn = querySDS().connect("/Environment/Software", myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT); @@ -10717,12 +10729,14 @@ class CInitGroups void initClusterGroups(bool force, StringBuffer &response, IPropertyTree *oldEnvironment, unsigned timems) { CInitGroups init(timems, true); + init.clearLZGroups(); // clear existing LZ groups, current ones will be recreated init.constructGroups(force, response, oldEnvironment); } void initClusterAndStoragePlaneGroups(bool force, IPropertyTree *oldEnvironment, unsigned timems) { CInitGroups init(timems, true); + init.clearLZGroups(); // clear existing LZ groups, current ones will be recreated StringBuffer response; init.constructGroups(force, response, oldEnvironment);