Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hpcc 30370 Instrument soapcall function calls #18336

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,24 @@ class Url : public CInterface, implements IInterface
}
};

//if Url was globally accessible we could define this in jtrace instead
//http span standards documented here: https://opentelemetry.io/docs/specs/semconv/http/http-spans/
void setSpanURLAttributes(ISpan * clientSpan, const Url & url)
{
if (clientSpan == nullptr)
return;

clientSpan->setSpanAttribute("http.request.method", "POST"); //apparently hardcoded to post
rpastrana marked this conversation as resolved.
Show resolved Hide resolved
//Even though a "service" and
//a "method" are tracked and sometimes
//the target service name is used
//and there's code and comments suggesting only
//GET is supported...
clientSpan->setSpanAttribute("network.peer.address", url.host.get());
clientSpan->setSpanAttribute("network.peer.port", url.port);
clientSpan->setSpanAttribute("network.protocol.name", url.method.get());
}

typedef IArrayOf<Url> UrlArray;

//=================================================================================================
Expand Down Expand Up @@ -963,12 +981,14 @@ class CWSCHelper : implements IWSCHelper, public CInterface
WSCType wscType;

public:
Owned<ISpan> activitySpanScope;
IMPLEMENT_IINTERFACE;

CWSCHelper(IWSCRowProvider *_rowProvider, IEngineRowAllocator * _outputAllocator, const char *_authToken, WSCMode _wscMode, ClientCertificate *_clientCert,
const IContextLogger &_logctx, IRoxieAbortMonitor *_roxieAbortMonitor, WSCType _wscType)
: logctx(_logctx), outputAllocator(_outputAllocator), clientCert(_clientCert), roxieAbortMonitor(_roxieAbortMonitor)
{
activitySpanScope.setown(logctx.queryActiveSpan()->createInternalSpan(_wscType == STsoap ? "SoapCall Activity": "HTTPCall Activity"));
wscMode = _wscMode;
wscType = _wscType;
done = 0;
Expand Down Expand Up @@ -1034,7 +1054,6 @@ class CWSCHelper : implements IWSCHelper, public CInterface
s.setown(helper->getXpathHintsXml());
xpathHints.setown(createPTreeFromXMLString(s.get()));
}

if (wscType == STsoap)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be worth adding an attribute for the activity id - that will be very useful to the developer.

activitySpanScope->setSpanAttrubute("activity.id", )?

It might need to be a new parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds useful, but couldn't find a reasonable way to pass that info down to the helper

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added HPCC-31527 to add that as a separate task - it is relatively high priority.

{
soapaction.set(s.setown(helper->getSoapAction()));
Expand Down Expand Up @@ -1972,7 +1991,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (!httpHeaderBlockContainsHeader(httpheaders, ACCEPT_ENCODING))
request.appendf("%s: gzip, deflate\r\n", ACCEPT_ENCODING);
#endif
Owned<IProperties> traceHeaders = master->logctx.getClientHeaders();
Owned<IProperties> traceHeaders = ::getClientHeaders(master->activitySpanScope);
if (traceHeaders)
{
Owned<IPropertyIterator> iter = traceHeaders->getIterator();
Expand Down Expand Up @@ -2457,6 +2476,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
checkTimeLimitExceeded(&remainingMS); // after ep.set which might make a potentially long getaddrinfo lookup ...
if (strieq(url.method, "https"))
proto = PersistentProtocol::ProtoTLS;

bool shouldClose = false;
Owned<ISocket> psock = master->usePersistConnections() ? persistentHandler->getAvailable(&ep, &shouldClose, proto) : nullptr;
if (psock)
Expand Down Expand Up @@ -2504,13 +2524,15 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
{
if (master->timeLimitExceeded)
{
master->activitySpanScope->recordError(SpanError("Time Limit Exceeded", e->errorCode(), true, true));
master->logctx.CTXLOG("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS);
processException(url, inputRows, e);
return;
}

if (e->errorCode() == ROXIE_ABORT_EVENT)
{
master->activitySpanScope->recordError(SpanError("Aborted", e->errorCode(), true, true));
StringBuffer s;
master->logctx.CTXLOG("%s exiting: Roxie Abort : %s", getWsCallTypeName(master->wscType),e->errorMessage(s).str());
throw;
Expand All @@ -2523,19 +2545,25 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
idx = 0;
if (idx==startidx)
{
master->activitySpanScope->recordException(e, true, true);
StringBuffer s;
master->logctx.CTXLOG("Exception %s", e->errorMessage(s).str());
processException(url, inputRows, e);
return;
}
} while (blacklist->blacklisted(url.port, url.host));

master->activitySpanScope->recordException(e, false, false); //Record the exception, but don't set failure
}
}
try
{
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
OwnedSpanScope socketOperationSpan = master->activitySpanScope->createClientSpan("Socket Write");
setSpanURLAttributes(socketOperationSpan, url);
socket->write(request.str(), request.length());

if (soapTraceLevel > 4)
master->logctx.CTXLOG("%s: sent request (%s) to %s:%d", getWsCallTypeName(master->wscType),master->service.str(), url.host.str(), url.port);
checkTimeLimitExceeded(&remainingMS);
Expand All @@ -2544,23 +2572,30 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
bool keepAlive2;
StringBuffer contentType;
int rval = readHttpResponse(response, socket, keepAlive2, contentType);
socketOperationSpan->setSpanAttribute("http.response.status_code", (int64_t)rval);
keepAlive = keepAlive && keepAlive2;

if (soapTraceLevel > 4)
master->logctx.CTXLOG("%s: received response (%s) from %s:%d", getWsCallTypeName(master->wscType),master->service.str(), url.host.str(), url.port);

if (rval != 200)
{
socketOperationSpan->setSpanStatusSuccess(false);
if (rval == 503)
{
socketOperationSpan->recordError(SpanError("Server Too Busy", 1001, true, true));
throw new ReceivedRoxieException(1001, "Server Too Busy");
}

StringBuffer text;
text.appendf("HTTP error (%d) in processQuery",rval);
rtlAddExceptionTag(text, "soapresponse", response.str());
socketOperationSpan->recordError(SpanError(text.str(), -1, true, true));
throw MakeStringExceptionDirect(-1, text.str());
}
if (response.length() == 0)
{
socketOperationSpan->recordError(SpanError("Zero length response in processQuery", -1, true, true));
throw MakeStringException(-1, "Zero length response in processQuery");
}
checkTimeLimitExceeded(&remainingMS);
Expand All @@ -2575,6 +2610,8 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
else if (keepAlive)
persistentHandler->add(socket, &ep, proto);
}

socketOperationSpan->setSpanStatusSuccess(true);
break;
}
catch (IReceivedRoxieException *e)
Expand Down Expand Up @@ -2606,6 +2643,8 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
processException(url, e->errorRow(), e);
else
processException(url, inputRows, e);

master->activitySpanScope->recordException(e, true, true);
break;
}
}
Expand All @@ -2616,14 +2655,17 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (master->timeLimitExceeded)
{
processException(url, inputRows, e);
master->logctx.CTXLOG("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS);
VStringBuffer msg("%s exiting: time limit (%ums) exceeded", getWsCallTypeName(master->wscType), master->timeLimitMS);
master->logctx.CTXLOG("%s", msg.str());
master->activitySpanScope->recordError(SpanError(msg.str(), e->errorCode(), true, true));
break;
}

if (e->errorCode() == ROXIE_ABORT_EVENT)
{
StringBuffer s;
master->logctx.CTXLOG("%s exiting: Roxie Abort : %s", getWsCallTypeName(master->wscType),e->errorMessage(s).str());
master->activitySpanScope->recordError(SpanError("Aborted", e->errorCode(), true, true));
throw;
}

Expand All @@ -2634,26 +2676,36 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (numRetries >= master->maxRetries)
{
// error affects all inputRows
master->logctx.CTXLOG("Exiting: maxRetries %d exceeded", master->maxRetries);
VStringBuffer msg("Exiting: maxRetries %d exceeded", master->maxRetries);
master->logctx.CTXLOG("%s", msg.str());
master->activitySpanScope->recordError(SpanError(msg.str(), e->errorCode(), true, true));
processException(url, inputRows, e);
break;
}
numRetries++;
master->logctx.CTXLOG("Retrying: attempt %d of %d", numRetries, master->maxRetries);
master->activitySpanScope->recordException(e, false, false);
e->Release();
}
catch (std::exception & es)
{
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);
if(dynamic_cast<std::bad_alloc *>(&es))
{
master->activitySpanScope->recordError("std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
}

master->activitySpanScope->recordError(es.what());
throw MakeStringException(-1, "std::exception: standard library exception (%s) in CWSCAsyncFor processQuery",es.what());
}
catch (...)
{
if (master->usePersistConnections() && isReused)
persistentHandler->doneUsing(socket, false);

master->activitySpanScope->recordError(SpanError("Unknown exception in processQuery", -1, true, true));
throw MakeStringException(-1, "Unknown exception in processQuery");
}
}
Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ void ContextLogger::exportStatsToSpan(bool failed, stat_type elapsedNs, unsigned
{
if (activeSpan->isRecording())
{
activeSpan->setSpanStatus(failed);
activeSpan->setSpanStatusSuccess(!failed);
setSpanAttribute("time_elapsed", elapsedNs);

if (memused)
Expand Down
6 changes: 3 additions & 3 deletions system/jlib/jtrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,11 +748,11 @@ class CSpan : public CInterfaceOf<ISpan>
return span ? span->IsRecording() : false;
}

virtual void setSpanStatus(bool spanFailed, const char * statusMessage)
virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage)
{
if (span != nullptr)
{
span->SetStatus(spanFailed ? opentelemetry::trace::StatusCode::kError : opentelemetry::trace::StatusCode::kOk, statusMessage);
span->SetStatus(spanSucceeded ? opentelemetry::trace::StatusCode::kOk : opentelemetry::trace::StatusCode::kError, statusMessage);
}
}

Expand Down Expand Up @@ -894,7 +894,7 @@ class CNullSpan final : public CInterfaceOf<ISpan>

virtual void recordException(IException * e, bool spanFailed, bool escapedScope) override {}
virtual void recordError(const SpanError & error) override {};
virtual void setSpanStatus(bool spanFailed, const char * statusMessage) override {}
virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage) override {}

virtual const char* queryGlobalId() const override { return nullptr; }
virtual const char* queryCallerId() const override { return nullptr; }
Expand Down
8 changes: 4 additions & 4 deletions system/jlib/jtrace.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ struct SpanError
: errorMessage(_errorMessage), errorCode(_errorCode), spanFailed(_spanFailed), escapeScope(_escapeScope) {}

/**
* @brief Sets the span status.
* @param _spanFailed Flag indicating whether the span failed.
* @brief Sets the span status success.
* @param _spanSucceeded Flag indicating whether the span succeeded.
* @param _spanScopeEscape Flag indicating whether the exception escaped the scope of the span.
*/
void setSpanStatus(bool _spanFailed, bool _spanScopeEscape) { spanFailed = _spanFailed; escapeScope = _spanScopeEscape;}
void setSpanStatusSuccess(bool _spanSucceeded, bool _spanScopeEscape) { spanFailed = !_spanSucceeded; escapeScope = _spanScopeEscape;}

/**
* @brief Sets the error message and error code.
Expand All @@ -123,7 +123,7 @@ interface ISpan : extends IInterface
virtual bool isRecording() const = 0; // Is it worth adding any events/attributes to this span?
virtual void recordException(IException * e, bool spanFailed = true, bool escapedScope = true) = 0;
virtual void recordError(const SpanError & error = SpanError()) = 0;
virtual void setSpanStatus(bool spanFailed, const char * statusMessage = NO_STATUS_MESSAGE) = 0;
virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage = NO_STATUS_MESSAGE) = 0;

virtual ISpan * createClientSpan(const char * name) = 0;
virtual ISpan * createInternalSpan(const char * name) = 0;
Expand Down
9 changes: 2 additions & 7 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,12 @@ class JlibTraceTest : public CppUnit::TestFixture

{
OwnedSpanScope serverSpan = queryTraceManager().createServerSpan("failedErrorSpanEscaped", emptyMockHTTPHeaders);
SpanError * error = new SpanError("hello");
error->setSpanStatus(true, true);
serverSpan->recordError(*error);
serverSpan->recordError(SpanError("hello", -1, true, true)); //error message hello, no error code, error caused failure, and error caused escape
}//{ "type": "span", "name": "failedErrorSpanEscaped", "trace_id": "634f386c18a6140544c980e0d5a15905", "span_id": "e2f59c48f63a8f82", "start": 1709675508231168974, "duration": 7731717678, "status": "Error", "kind": "Server", "description": "hello", "instrumented_library": "unittests", "events":[ { "name": "Exception", "time_stamp": 1709675512164430668, "attributes": {"escaped": 1,"message": "hello" } } ] }

{
OwnedSpanScope serverSpan = queryTraceManager().createServerSpan("failedErrEscapedMsgErrCode", emptyMockHTTPHeaders);
SpanError * error = new SpanError();
error->setSpanStatus(true, true);
error->setError("hello", 34);
serverSpan->recordError(*error);
serverSpan->recordError(SpanError("hello", 34, true, true)); //error message hello, error code 34, error caused failure, and error caused escape
}//failedErrEscapedMsgErrCode

{
Expand Down
Loading