From 429f1f7ad3aff86397871f7df5cb1357fcab19b4 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Mon, 25 Sep 2023 13:45:44 +0100 Subject: [PATCH] HPCC-30350 Add open telemetry support to roxie/hthor/thor Signed-off-by: Gavin Halliday --- common/thorhelper/roxiehelper.hpp | 1 + common/thorhelper/thorsoapcall.cpp | 19 ++- common/workunit/workunit.cpp | 58 +++++++++ common/workunit/workunit.hpp | 4 + ecl/eclagent/eclagent.cpp | 27 +--- roxie/ccd/ccd.hpp | 24 ++-- roxie/ccd/ccdcontext.cpp | 8 +- roxie/ccd/ccdlistener.cpp | 86 ++++++------- roxie/ccd/ccdprotocol.cpp | 14 +- roxie/ccd/ccdserver.cpp | 17 +-- roxie/ccd/hpccprotocol.hpp | 3 +- system/jlib/jlog.cpp | 29 +++-- system/jlib/jlog.hpp | 4 +- system/jlib/jprop.cpp | 16 +++ system/jlib/jprop.hpp | 1 + system/jlib/jtrace.cpp | 197 +++++++++++++++++------------ system/jlib/jtrace.hpp | 39 +++--- thorlcr/graph/thgraphmaster.cpp | 27 +--- thorlcr/graph/thgraphslave.cpp | 26 +--- thorlcr/thorutil/thormisc.hpp | 24 ++-- 20 files changed, 341 insertions(+), 283 deletions(-) diff --git a/common/thorhelper/roxiehelper.hpp b/common/thorhelper/roxiehelper.hpp index eb81c8e847d..1dfe4ae9553 100644 --- a/common/thorhelper/roxiehelper.hpp +++ b/common/thorhelper/roxiehelper.hpp @@ -210,6 +210,7 @@ class THORHELPER_API HttpHelper : public CInterface return getContentTypeMlFormat(); } IProperties *queryUrlParameters(){return parameters;} + const IProperties * queryRequestHeaders() const { return reqHeaders; } bool validateHttpGetTarget(const char *target) { if (!target) diff --git a/common/thorhelper/thorsoapcall.cpp b/common/thorhelper/thorsoapcall.cpp index ae9db5ffd49..67a1dcdd5f0 100644 --- a/common/thorhelper/thorsoapcall.cpp +++ b/common/thorhelper/thorsoapcall.cpp @@ -1911,13 +1911,20 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo if (!httpHeaderBlockContainsHeader(httpheaders, ACCEPT_ENCODING)) request.appendf("%s: gzip, deflate\r\n", ACCEPT_ENCODING); #endif - if (!isEmptyString(master->logctx.queryGlobalId())) + Owned traceHeaders = master->logctx.getClientHeaders(); + if (traceHeaders) { - if (!httpHeaderBlockContainsHeader(httpheaders, kGlobalIdHttpHeaderName)) - request.append(kGlobalIdHttpHeaderName).append(": ").append(master->logctx.queryGlobalId()).append("\r\n"); - - if (!isEmptyString(master->logctx.queryLocalId()) && !httpHeaderBlockContainsHeader(httpheaders, kCallerIdHttpHeaderName)) - request.append(kCallerIdHttpHeaderName).append(": ").append(master->logctx.queryLocalId()).append("\r\n"); //our localId is reciever's callerId + Owned iter = traceHeaders->getIterator(); + ForEach(*iter) + { + const char * key = iter->getPropKey(); + if (!httpHeaderBlockContainsHeader(httpheaders, key)) + { + const char * value = traceHeaders->queryProp(key); + if (!isEmptyString(value)) + request.append(key).append(": ").append(value).append("\r\n"); + } + } } if (master->wscType == STsoap) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index ad13fd22bda..30e36b76e82 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -14634,3 +14634,61 @@ bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const throwUnexpected(); } #endif + + +//The names of the debug options used to serialize trace info - lower case to ensure they also work on the property tree (in thor slaves) +static constexpr const char * traceDebugOptions[] = { "globalid", "callerid", "ottraceparent", "ottracestate" }; +//The names of the headers containing the trace info +static constexpr const char * traceHeaderNames[] = { "global-id", "caller-id", "traceparent", "tracestate" }; +static_assert(_elements_in(traceDebugOptions) == _elements_in(traceHeaderNames), "Inconsistent tracePropertyNames, traceHeaderNames arrays"); + +IProperties * extractTraceDebugOptions(IConstWorkUnit * source) +{ + Owned target = createProperties(true); + SCMStringBuffer temp; + for (unsigned i=0; i < _elements_in(traceDebugOptions); i++) + { + const char * debugOption = traceDebugOptions[i]; + const char * headerName = traceHeaderNames[i]; + if (source->hasDebugValue(debugOption)) + { + temp.clear(); + source->getDebugValue(debugOption, temp); + target->setProp(headerName, temp.str()); + } + } + return target.getClear(); +} + +IProperties * deserializeTraceDebugOptions(const IPropertyTree * debugOptions) +{ + Owned target = createProperties(true); + if (debugOptions) + { + for (unsigned i=0; i < _elements_in(traceDebugOptions); i++) + { + const char * debugOption = traceDebugOptions[i]; + const char * headerName = traceHeaderNames[i]; + if (debugOptions->hasProp(debugOption)) + { + const char * value = debugOptions->queryProp(debugOption); + target->setProp(headerName, value); + } + } + } + return target.getClear(); +} + +void recordTraceDebugOptions(IWorkUnit * target, const IProperties * source) +{ + for (unsigned i=0; i < _elements_in(traceDebugOptions); i++) + { + const char * headerName = traceHeaderNames[i]; + const char * debugOption = traceDebugOptions[i]; + if (source->hasProp(headerName)) + { + const char * value = source->queryProp(headerName); + target->setDebugValue(debugOption, value, true); + } + } +} diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index 2143e709c70..60c06b5a56d 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1764,6 +1764,10 @@ class WORKUNIT_API WorkUnitErrorReceiver : implements IErrorReceiver, public CIn bool removeTimeStamp; }; +extern WORKUNIT_API IProperties * extractTraceDebugOptions(IConstWorkUnit * source); +extern WORKUNIT_API IProperties * deserializeTraceDebugOptions(const IPropertyTree * debugOptions); +extern WORKUNIT_API void recordTraceDebugOptions(IWorkUnit * target, const IProperties * source); + extern WORKUNIT_API void addWorkunitException(IWorkUnit * wu, IError * error, bool removeTimeStamp); inline bool isGlobalScope(const char * scope) { return scope && (streq(scope, GLOBAL_SCOPE) || streq(scope, LEGACY_GLOBAL_SCOPE)); } diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index fb37eb3bf9e..d6c749a09c7 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -2130,30 +2130,9 @@ void EclAgent::runProcess(IEclProcess *process) assertex(rowManager==NULL); allocatorMetaCache.setown(createRowAllocatorCache(this)); - if (queryWorkUnit()->hasDebugValue("GlobalId")) - { - SCMStringBuffer globalId; - queryWorkUnit()->getDebugValue("GlobalId", globalId); - if (globalId.length()) - { - SocketEndpoint thorEp; - thorEp.setLocalHost(0); - updateDummyContextLogger().setGlobalId(globalId.str(), thorEp, GetCurrentProcessId()); - - VStringBuffer msg("GlobalId: %s", globalId.str()); - SCMStringBuffer txId; - queryWorkUnit()->getDebugValue("CallerId", txId); - if (txId.length()) - { - updateDummyContextLogger().setCallerId(txId.str()); - msg.append(", CallerId: ").append(txId.str()); - } - txId.set(updateDummyContextLogger().queryLocalId()); - if (txId.length()) - msg.append(", LocalId: ").append(txId.str()); - updateDummyContextLogger().CTXLOG("%s", msg.str()); - } - } + Owned traceHeaders = extractTraceDebugOptions(queryWorkUnit()); + Owned requestSpan = queryTraceManager().createServerSpan(queryWorkUnit()->queryWuid(), traceHeaders); + updateDummyContextLogger().setActiveSpan(requestSpan); // a component may specify an alternate name for the agent/workflow memory area, // e.g. Thor specifies in "eclAgentMemory" diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index fb9b07c8ad6..1999986d9e5 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -589,7 +589,7 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface mutable bool aborted; mutable CIArrayOf log; private: - LogTrace logTrace; + Owned activeSpan; ContextLogger(const ContextLogger &); // Disable copy constructor public: IMPLEMENT_IINTERFACE; @@ -742,25 +742,33 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface { stats.reset(); } - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override + virtual void setActiveSpan(ISpan * span) override { - logTrace.setGlobalId(id); + activeSpan.set(span); } - virtual void setCallerId(const char *id) override + virtual IProperties * getClientHeaders() const override { - logTrace.setCallerId(id); + if (!activeSpan) + return nullptr; + return ::getClientHeaders(activeSpan); } virtual const char *queryGlobalId() const override { - return logTrace.queryGlobalId(); + if (!activeSpan) + return nullptr; + return activeSpan->queryGlobalId(); } virtual const char *queryCallerId() const override { - return logTrace.queryCallerId(); + if (!activeSpan) + return nullptr; + return activeSpan->queryCallerId(); } virtual const char *queryLocalId() const override { - return logTrace.queryLocalId(); + if (!activeSpan) + return nullptr; + return activeSpan->queryLocalId(); } virtual const CRuntimeStatisticCollection & queryStats() const override { diff --git a/roxie/ccd/ccdcontext.cpp b/roxie/ccd/ccdcontext.cpp index 5b7ca7e0daf..70afbbb7148 100644 --- a/roxie/ccd/ccdcontext.cpp +++ b/roxie/ccd/ccdcontext.cpp @@ -1379,13 +1379,13 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext { return logctx.queryTraceLevel(); } - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override + virtual void setActiveSpan(ISpan * span) override { - const_cast(logctx).setGlobalId(id, ep, pid); + const_cast(logctx).setActiveSpan(span); } - virtual void setCallerId(const char *id) override + virtual IProperties * getClientHeaders() const override { - const_cast(logctx).setCallerId(id); + return logctx.getClientHeaders(); } virtual const char *queryGlobalId() const override { diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index f575adf82c9..8daa176605e 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -1177,17 +1177,11 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker wu.setown(daliHelper->attachWorkunit(wuid.get())); } Owned logctx = new StringContextLogger(wuid.get()); - if (wu->hasDebugValue("GlobalId")) - { - SCMStringBuffer globalId; - SocketEndpoint ep; - ep.setLocalHost(0); - SCMStringBuffer callerId; - logctx->setGlobalId(wu->getDebugValue("GlobalId", globalId).str(), ep, GetCurrentProcessId()); - wu->getDebugValue("CallerId", callerId); - if (callerId.length()) - logctx->setCallerId(callerId.str()); - } + + Owned traceHeaders = extractTraceDebugOptions(wu); + Owned requestSpan = queryTraceManager().createServerSpan(wu->queryWuid(), traceHeaders); + updateDummyContextLogger().setActiveSpan(requestSpan); + Owned queryFactory; try { @@ -1308,6 +1302,7 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker StringBuffer s; logctx.getStats(s); + //MORE: logctx.queryActiveSpan()->getLogPrefix() or similar. StringBuffer txidInfo; const char *globalId = logctx.queryGlobalId(); if (globalId && *globalId) @@ -1357,7 +1352,6 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte public: StringAttr queryName; StringAttr uid = "-"; - StringAttr callerId; Owned cascade; Owned debuggerContext; Owned debugCmdHandler; @@ -1367,6 +1361,7 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte SocketEndpoint ep; time_t startTime; bool notedActive = false; + bool ensureGlobalIdExists = false; public: IMPLEMENT_IINTERFACE; @@ -1415,9 +1410,7 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte IConstWorkUnit *workunit = queryFactory->queryWorkUnit(); if (workunit && workunit->getDebugValueBool("generateGlobalId", false) && isEmptyString(logctx->queryGlobalId())) { - StringBuffer gen_id; - appendGloballyUniqueId(gen_id); - setTransactionId(gen_id.str(), nullptr, true); + ensureGlobalIdExists = true; } } virtual void noteQueryActive() @@ -1445,44 +1438,38 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte return *cascade; } - virtual void setTransactionId(const char *id, const char *caller, bool global) override + virtual void startSpan(const char * id, const IProperties * headers) override { - if (!id || !*id) - return; - uid.set(id); - ensureContextLogger(); - if (!global && !isEmptyString(logctx->queryGlobalId())) //globalId wins - return; - StringBuffer s; - ep.getHostText(s).appendf(":%u{%s}", ep.port, uid.str()); //keep no matter what for existing log parsers - if (global) + Linked allHeaders = headers; + SpanFlags flags = (ensureGlobalIdExists) ? SpanFlags::EnsureGlobalId : SpanFlags::None; + if (headers && !headers->queryProp("global-id")) { - s.append('['); - logctx->setGlobalId(id, ep, 0); - if (caller && *caller) + //If an id is provided, and we are not automatically creating global ids, use the id as the global-id + if ((id && *id) && !ensureGlobalIdExists) { - setCallerId(caller); - logctx->setCallerId(caller); + Owned clonedHeaders = cloneProperties(headers, true); + clonedHeaders->setProp("global-id", id); + allHeaders.setown(clonedHeaders.getClear()); } - if (callerId.length()) - s.appendf("caller:%s", callerId.str()); + } - const char *local = logctx->queryLocalId(); //generated in setGlobalId above - if (local && *local) - { - if (callerId.length()) - s.append(','); - s.appendf("local:%s", local); - } - s.append("]"); + ensureContextLogger(); + + Owned requestSpan = queryTraceManager().createServerSpan("request", allHeaders, flags); + logctx->setActiveSpan(requestSpan); + + const char * globalId = requestSpan->queryGlobalId(); + if (globalId) + id = globalId; + + uid.set(id); + if (id) + { + StringBuffer s; + ep.getHostText(s).appendf(":%u{%s}", ep.port, id); //keep no matter what for existing log parsers + requestSpan->getLogPrefix(s); + logctx->set(s.str()); } - logctx->set(s.str()); - } - virtual void setCallerId(const char *id) - { - if (!id || !*id) - return; - callerId.set(id); } inline IDebuggerContext &ensureDebuggerContext(const char *id) { @@ -1602,9 +1589,10 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte StringBuffer s; logctx->getStats(s); + const char * callerId = logctx->queryCallerId(); StringBuffer txIds; - if (callerId.length()) - txIds.appendf("caller: %s", callerId.str()); + if (!isEmptyString(callerId)) + txIds.appendf("caller: %s", callerId); const char *localId = logctx->queryLocalId(); if (localId && *localId) { diff --git a/roxie/ccd/ccdprotocol.cpp b/roxie/ccd/ccdprotocol.cpp index 1cf57813385..fe0e1be6567 100644 --- a/roxie/ccd/ccdprotocol.cpp +++ b/roxie/ccd/ccdprotocol.cpp @@ -1795,13 +1795,6 @@ class RoxieSocketWorker : public ProtocolQueryWorker { mlResponseFmt = httpHelper.queryResponseMlFormat(); mlRequestFmt = httpHelper.queryRequestMlFormat(); - - const char *globalId = queryRequestGlobalIdHeader(httpHelper, logctx); - const char *callerId = queryRequestCallerIdHeader(httpHelper, logctx); - if (globalId && *globalId) - msgctx->setTransactionId(globalId, callerId, true); //logged and forwarded through SOAPCALL/HTTPCALL - else if (callerId && *callerId) - msgctx->setCallerId(callerId); //may not matter, but maintain old behavior } } @@ -1906,9 +1899,10 @@ class RoxieSocketWorker : public ProtocolQueryWorker uid = NULL; sanitizeQuery(queryPT, queryName, sanitizedText, httpHelper, uid, isBlind, isDebug); - if (uid) - msgctx->setTransactionId(uid, nullptr, false); - else + + msgctx->startSpan(uid, httpHelper.queryRequestHeaders()); + + if (!uid) uid = "-"; sink->checkAccess(peer, queryName, sanitizedText, isBlind); diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index fa6aeec7954..c7333050318 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -271,13 +271,13 @@ class IndirectAgentContext : implements IRoxieAgentContext, public CInterface { return ctx->isBlind(); } - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override + virtual void setActiveSpan(ISpan * span) override { - ctx->setGlobalId(id, ep, pid); + ctx->setActiveSpan(span); } - virtual void setCallerId(const char *id) override + virtual IProperties * getClientHeaders() const override { - ctx->setCallerId(id); + return ctx->getClientHeaders(); } virtual const char *queryGlobalId() const { @@ -1347,15 +1347,16 @@ class CRoxieServerActivity : implements CInterfaceOf, impl return traceLevel; } - virtual void setGlobalId(const char *id, SocketEndpoint&ep, unsigned pid) override + virtual void setActiveSpan(ISpan * span) override { if (ctx) - ctx->setGlobalId(id, ep, pid); + ctx->setActiveSpan(span); } - virtual void setCallerId(const char *id) override + virtual IProperties * getClientHeaders() const override { if (ctx) - ctx->setCallerId(id); + return ctx->getClientHeaders(); + return nullptr; } virtual const char *queryGlobalId() const override { diff --git a/roxie/ccd/hpccprotocol.hpp b/roxie/ccd/hpccprotocol.hpp index 60cdb842bc7..21380c0a54d 100644 --- a/roxie/ccd/hpccprotocol.hpp +++ b/roxie/ccd/hpccprotocol.hpp @@ -41,8 +41,7 @@ interface IHpccProtocolMsgContext : extends IInterface virtual bool getIntercept() = 0; virtual void outputLogXML(IXmlStreamFlusher &out) = 0; virtual void writeLogXML(IXmlWriter &writer) = 0; - virtual void setTransactionId(const char *id, const char *caller, bool global) = 0; - virtual void setCallerId(const char *id) = 0; + virtual void startSpan(const char * uid, const IProperties * headers) = 0; }; interface IHpccProtocolResultsWriter : extends IInterface diff --git a/system/jlib/jlog.cpp b/system/jlib/jlog.cpp index 1086c1c58f8..e3f82189d26 100644 --- a/system/jlib/jlog.cpp +++ b/system/jlib/jlog.cpp @@ -2822,10 +2822,15 @@ void IContextLogger::logOperatorException(IException *E, const char *file, unsig } class CRuntimeStatisticCollection; +/* + This class is used to implement the default log context - especially used for engines that only support a single query at a time + */ + class DummyLogCtx : implements IContextLogger { private: - LogTrace logTrace; + Owned activeSpan; + public: DummyLogCtx() {} // It's a static object - we don't want to actually link-count it... @@ -2863,25 +2868,33 @@ class DummyLogCtx : implements IContextLogger { return 0; } - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override + virtual void setActiveSpan(ISpan * span) override { - logTrace.setGlobalId(id); + activeSpan.set(span); } - virtual void setCallerId(const char *id) override + virtual IProperties * getClientHeaders() const override { - logTrace.setCallerId(id); + if (!activeSpan) + return nullptr; + return ::getClientHeaders(activeSpan); } virtual const char *queryGlobalId() const override { - return logTrace.queryGlobalId(); + if (!activeSpan) + return nullptr; + return activeSpan->queryGlobalId(); } virtual const char *queryCallerId() const override { - return logTrace.queryCallerId(); + if (!activeSpan) + return nullptr; + return activeSpan->queryCallerId(); } virtual const char *queryLocalId() const override { - return logTrace.queryLocalId(); + if (!activeSpan) + return nullptr; + return activeSpan->queryLocalId(); } virtual const CRuntimeStatisticCollection &queryStats() const override { diff --git a/system/jlib/jlog.hpp b/system/jlib/jlog.hpp index a8f0af48ebd..efc0d327fbe 100644 --- a/system/jlib/jlog.hpp +++ b/system/jlib/jlog.hpp @@ -1253,12 +1253,12 @@ interface jlib_decl IContextLogger : extends IInterface virtual void mergeStats(const CRuntimeStatisticCollection &from) const = 0; virtual unsigned queryTraceLevel() const = 0; - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) = 0; virtual const char *queryGlobalId() const = 0; virtual const char *queryLocalId() const = 0; - virtual void setCallerId(const char *id) = 0; virtual const char *queryCallerId() const = 0; virtual const CRuntimeStatisticCollection & queryStats() const = 0; + virtual void setActiveSpan(ISpan * span) = 0; + virtual IProperties * getClientHeaders() const = 0; }; extern jlib_decl StringBuffer &appendGloballyUniqueId(StringBuffer &s); diff --git a/system/jlib/jprop.cpp b/system/jlib/jprop.cpp index d2d588a3881..62d37d0056d 100644 --- a/system/jlib/jprop.cpp +++ b/system/jlib/jprop.cpp @@ -386,6 +386,22 @@ extern jlib_decl IProperties *createProperties(const char *filename, bool nocase else return new CProperties(nocase); } + +IProperties *cloneProperties(const IProperties * source, bool nocase) +{ + Owned clone = createProperties(nocase); + if (source) + { + Owned iter = source->getIterator(); + ForEach(*iter) + { + const char * key = iter->getPropKey(); + clone->setProp(key, source->queryProp(key)); + } + } + return clone.getClear(); +} + static CProperties *sysProps = NULL; extern jlib_decl IProperties *querySystemProperties() diff --git a/system/jlib/jprop.hpp b/system/jlib/jprop.hpp index a30f42dd37f..e2c85a02fd4 100644 --- a/system/jlib/jprop.hpp +++ b/system/jlib/jprop.hpp @@ -72,6 +72,7 @@ interface IProperties : public IPropertiesOf { }; extern jlib_decl IProperties *createProperties(bool nocase = false); extern jlib_decl IProperties *createProperties(const char *filename, bool nocase = false); +extern jlib_decl IProperties *cloneProperties(const IProperties * properties, bool nocase = false); extern jlib_decl IProperties *querySystemProperties(); extern jlib_decl IProperties *getSystemProperties(); diff --git a/system/jlib/jtrace.cpp b/system/jlib/jtrace.cpp index a6f7c8d27ee..46f01033c30 100644 --- a/system/jlib/jtrace.cpp +++ b/system/jlib/jtrace.cpp @@ -48,59 +48,6 @@ namespace context = opentelemetry::context; namespace nostd = opentelemetry::nostd; namespace opentel_trace = opentelemetry::trace; -using namespace ln_uid; - -/* -* Sets global id if provided, and assign a localId -*/ -void LogTrace::setGlobalId(const char* id) -{ - if (!isEmptyString(id)) - { - globalId.set(id); - assignLocalId(); - } -} - -/* -* Sets global id if provided, assigns new localID -*/ -LogTrace::LogTrace(const char * globalId) -{ - setGlobalId(globalId); -} - -LogTrace::LogTrace() -{ - assignLocalId(); -} - -const char* LogTrace::assignLocalId() -{ - localId.set(createUniqueIdString().c_str()); - return localId.get(); -} - -const char* LogTrace::queryGlobalId() const -{ - return globalId.get(); -} - -void LogTrace::setCallerId(const char* id) -{ - callerId.set(id); -} - -const char* LogTrace::queryCallerId() const -{ - return callerId.get(); -} - -const char* LogTrace::queryLocalId() const -{ - return localId.get(); -} - class CHPCCHttpTextMapCarrier : public opentelemetry::context::propagation::TextMapCarrier { public: @@ -173,9 +120,11 @@ class CSpan : public CInterfaceOf out.append(",\"Name\":\"").append(name.get()).append("\""); if (!isEmptyString(hpccGlobalId.get())) - out.append(",\"HPCCGlobalID\":\"").append(hpccGlobalId.get()).append("\""); + out.append(",\"GlobalID\":\"").append(hpccGlobalId.get()).append("\""); if (!isEmptyString(hpccCallerId.get())) - out.append(",\"HPCCCallerID\":\"").append(hpccCallerId.get()).append("\""); + out.append(",\"CallerID\":\"").append(hpccCallerId.get()).append("\""); + if (!isEmptyString(hpccLocalId.get())) + out.append(",\"LocalID\":\"").append(hpccLocalId.get()).append("\""); if (span != nullptr) { @@ -289,8 +238,17 @@ class CSpan : public CInterfaceOf if (!isEmptyString(hpccGlobalId.get())) ctxProps->setProp(kGlobalIdHttpHeaderName, hpccGlobalId.get()); - if (!isEmptyString(hpccCallerId.get())) - ctxProps->setProp(kCallerIdHttpHeaderName, hpccCallerId.get()); + if (otelFormatted) + { + //The localid is passed as the callerid for the client request.... + if (!isEmptyString(hpccLocalId.get())) + ctxProps->setProp(kCallerIdHttpHeaderName, hpccLocalId.get()); + } + else + { + if (!isEmptyString(hpccCallerId.get())) + ctxProps->setProp(kCallerIdHttpHeaderName, hpccCallerId.get()); + } if (span == nullptr) return false; @@ -351,11 +309,57 @@ class CSpan : public CInterfaceOf return opentelemetry::trace::SpanContext::GetInvalid(); } + virtual void getLogPrefix(StringBuffer & out) const override + { + const char * caller = queryCallerId(); + const char * local = queryLocalId(); + bool hasCaller = !isEmptyString(caller); + bool hasLocal = !isEmptyString(local); + if (hasCaller || hasLocal) + { + out.append('['); + if (hasCaller) + out.appendf("caller:%s", caller); + + if (hasLocal) + { + if (hasCaller) + out.append(','); + out.appendf("local:%s", local); + } + out.append("]"); + } + } + const char * queryTraceName() const { return tracerName.get(); } + virtual const char* queryGlobalId() const override + { + //MORE: This should probably only be stored in the server context.... + if (localParentSpan && isEmptyString(hpccGlobalId)) + return localParentSpan->queryGlobalId(); + return hpccGlobalId.get(); + } + + virtual const char* queryCallerId() const override + { + //MORE: This should probably only be stored in the server context.... + if (localParentSpan && isEmptyString(hpccCallerId)) + return localParentSpan->queryCallerId(); + return hpccCallerId.get(); + } + + virtual const char* queryLocalId() const override + { + //MORE: This should probably only be stored in the server context.... + if (localParentSpan && isEmptyString(hpccLocalId)) + return localParentSpan->queryLocalId(); + return hpccLocalId.get(); + } + protected: CSpan(const char * spanName, CSpan * parent) { @@ -374,6 +378,10 @@ class CSpan : public CInterfaceOf void init() { + bool createLocalId = !isEmptyString(hpccGlobalId); + if (createLocalId) + hpccLocalId.set(ln_uid::createUniqueIdString().c_str()); + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); //what if tracerName is empty? @@ -392,6 +400,7 @@ class CSpan : public CInterfaceOf toLog(out); DBGLOG("Span start: {%s}", out.str()); } + } void storeSpanContext() @@ -467,6 +476,7 @@ class CSpan : public CInterfaceOf StringAttr spanID; StringAttr hpccGlobalId; StringAttr hpccCallerId; + StringAttr hpccLocalId; opentelemetry::trace::StartSpanOptions opts; nostd::shared_ptr span; @@ -537,7 +547,7 @@ class CServerSpan : public CSpan //Remote parent is declared via http headers from client call opentelemetry::v1::trace::SpanContext remoteParentSpanCtx = opentelemetry::trace::SpanContext::GetInvalid(); - void setSpanContext(StringArray & httpHeaders, const char kvDelineator = ':') + void setSpanContext(StringArray & httpHeaders, const char kvDelineator, SpanFlags flags) { Owned contextProps = createProperties(true); ForEachItemIn(currentHeaderIndex, httpHeaders) @@ -556,10 +566,10 @@ class CServerSpan : public CSpan contextProps->setProp(key, delineator + 1); } - setSpanContext(contextProps); + setSpanContext(contextProps, flags); } - void setSpanContext(const IProperties * httpHeaders) + void setSpanContext(const IProperties * httpHeaders, SpanFlags flags) { if (httpHeaders) { @@ -572,17 +582,17 @@ class CServerSpan : public CSpan hpccGlobalId.set(httpHeaders->queryProp(kGlobalIdHttpHeaderName)); else if (httpHeaders->hasProp(kLegacyGlobalIdHttpHeaderName)) hpccGlobalId.set(httpHeaders->queryProp(kLegacyGlobalIdHttpHeaderName)); - else - DBGLOG("ServerSpan: HPCCGlobalID not found in http headers"); + else if (hasMask(flags, SpanFlags::EnsureGlobalId) || queryTraceManager().alwaysCreateGlobalIds()) + { + StringBuffer generatedId; + appendGloballyUniqueId(generatedId); + hpccGlobalId.set(generatedId.str()); + } if (httpHeaders->hasProp(kCallerIdHttpHeaderName)) hpccCallerId.set(httpHeaders->queryProp(kCallerIdHttpHeaderName)); else if (httpHeaders->hasProp(kLegacyCallerIdHttpHeaderName)) hpccCallerId.set(httpHeaders->queryProp(kLegacyCallerIdHttpHeaderName)); - else - { - DBGLOG("ServerSpan: HPCCCallerID not provied"); - } const CHPCCHttpTextMapCarrier carrier(httpHeaders); auto globalPropegator = context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); @@ -597,8 +607,6 @@ class CServerSpan : public CSpan remoteParentSpanCtx = remoteParentSpan->GetContext(); opts.parent = remoteParentSpanCtx; } - else - WARNLOG("ServerSpan: Could not create remote parent span based on OTel parenttrace/tracestate headers"); } //Generate new HPCCGlobalID if not provided @@ -621,19 +629,19 @@ class CServerSpan : public CSpan } public: - CServerSpan(const char * spanName, const char * tracerName_, StringArray & httpHeaders) + CServerSpan(const char * spanName, const char * tracerName_, StringArray & httpHeaders, SpanFlags flags) : CSpan(spanName, tracerName_) { opts.kind = opentelemetry::trace::SpanKind::kServer; - setSpanContext(httpHeaders); + setSpanContext(httpHeaders, ':', flags); init(); } - CServerSpan(const char * spanName, const char * tracerName_, const IProperties * httpHeaders) + CServerSpan(const char * spanName, const char * tracerName_, const IProperties * httpHeaders, SpanFlags flags) : CSpan(spanName, tracerName_) { opts.kind = opentelemetry::trace::SpanKind::kServer; - setSpanContext(httpHeaders); + setSpanContext(httpHeaders, flags); init(); } @@ -667,10 +675,22 @@ class CServerSpan : public CSpan } }; +//--------------------------------------------------------------------------------------------------------------------- + +IProperties * getClientHeaders(const ISpan * span) +{ + Owned headers = createProperties(true); + span->getSpanContext(headers, true); // Return value is not helpful + return headers.getClear(); +} + +//--------------------------------------------------------------------------------------------------------------------- + class CTraceManager : implements ITraceManager, public CInterface { private: bool enabled = true; + bool optAlwaysCreateGlobalIds = false; StringAttr moduleName; //Initializes the global trace provider which is required for all Otel based tracing operations. @@ -778,20 +798,21 @@ class CTraceManager : implements ITraceManager, public CInterface global: tracing: #optional - tracing enabled by default disable: true #optional - disable OTel tracing + alwaysCreateGlobalIds : false #optional - should global ids always be created? exporter: #optional - Controls how trace data is exported/reported - type: OTLP #OS|OTLP|Prometheus|HPCC (default: no export, jlog entry) - endpoint: "localhost:4317" #exporter specific key/value pairs - useSslCredentials: true - sslCredentialsCACcert: "ssl-certificate" + type: OTLP #OS|OTLP|Prometheus|HPCC (default: no export, jlog entry) + endpoint: "localhost:4317" #exporter specific key/value pairs + useSslCredentials: true + sslCredentialsCACcert: "ssl-certificate" processor: #optional - Controls span processing style - type: batch #simple|batch (default: simple) + type: batch #simple|batch (default: simple) */ void initTracer(IPropertyTree * traceConfig) { try { - Owned testTree; #ifdef TRACECONFIGDEBUG + Owned testTree; if (!traceConfig || !traceConfig->hasProp("tracing")) { const char * simulatedGlobalYaml = R"!!(global: @@ -822,13 +843,18 @@ class CTraceManager : implements ITraceManager, public CInterface static nostd::shared_ptr noopProvider(new NoopTracerProvider); opentelemetry::trace::Provider::SetTracerProvider(noopProvider); enabled = false; - DBGLOG("OpenTel tracing diabled!!"); } else { initTracerProviderAndGlobalInternals(traceConfig); } + //Non open-telemetry tracing configuration + if (traceConfig) + { + optAlwaysCreateGlobalIds = traceConfig->getPropBool("@alwaysCreateGlobalIds", optAlwaysCreateGlobalIds); + } + // The global propagator should be set regardless of whether tracing is enabled or not. // Injects Context into and extracts it from carriers that travel in-band // across process boundaries. Encoding is expected to conform to the HTTP @@ -870,14 +896,14 @@ class CTraceManager : implements ITraceManager, public CInterface throw makeStringExceptionV(-1, "TraceManager must be intialized!"); } - ISpan * createServerSpan(const char * name, StringArray & httpHeaders) override + ISpan * createServerSpan(const char * name, StringArray & httpHeaders, SpanFlags flags) override { - return new CServerSpan(name, moduleName.get(), httpHeaders); + return new CServerSpan(name, moduleName.get(), httpHeaders, flags); } - ISpan * createServerSpan(const char * name, const IProperties * httpHeaders) override + ISpan * createServerSpan(const char * name, const IProperties * httpHeaders, SpanFlags flags) override { - return new CServerSpan(name, moduleName.get(), httpHeaders); + return new CServerSpan(name, moduleName.get(), httpHeaders, flags); } const char * getTracedComponentName() const override @@ -889,6 +915,11 @@ class CTraceManager : implements ITraceManager, public CInterface { return enabled; } + + virtual bool alwaysCreateGlobalIds() const + { + return optAlwaysCreateGlobalIds; + } }; static Singleton theTraceManager; diff --git a/system/jlib/jtrace.hpp b/system/jlib/jtrace.hpp index 2094dd9b14a..34ab6100f19 100644 --- a/system/jlib/jtrace.hpp +++ b/system/jlib/jtrace.hpp @@ -28,29 +28,12 @@ static constexpr const char *kCallerIdHttpHeaderName = "Caller-Id"; static constexpr const char *kLegacyGlobalIdHttpHeaderName = "HPCC-Global-Id"; static constexpr const char *kLegacyCallerIdHttpHeaderName = "HPCC-Caller-Id"; -class jlib_decl LogTrace +enum class SpanFlags : unsigned { -private: - StringAttr globalId; - StringAttr callerId; - StringAttr localId; - - const char* assignLocalId(); - -public: - - LogTrace(); - LogTrace(const char * globalId); - - const char* queryGlobalId() const; - const char* queryCallerId() const; - const char* queryLocalId() const; - - //can these be private with abstract methods exposed to create/set these values? - void setGlobalId(const char* id); - void setCallerId(const char* id); - void setLocalId(const char* id); + None = 0x000000000, + EnsureGlobalId = 0x000000001, }; +BITMASK_ENUM(SpanFlags); interface ISpan : extends IInterface { @@ -60,16 +43,25 @@ interface ISpan : extends IInterface virtual bool getSpanContext(IProperties * ctxProps, bool otelFormatted) const = 0; virtual void toString(StringBuffer & out) const = 0; virtual void toLog(StringBuffer & out) const = 0; + virtual void getLogPrefix(StringBuffer & out) const = 0; virtual ISpan * createClientSpan(const char * name) = 0; virtual ISpan * createInternalSpan(const char * name) = 0; + +//Old-style global/caller/local id interface functions + virtual const char* queryGlobalId() const = 0; + virtual const char* queryCallerId() const = 0; + virtual const char* queryLocalId() const = 0; }; +extern jlib_decl IProperties * getClientHeaders(const ISpan * span); + interface ITraceManager : extends IInterface { - virtual ISpan * createServerSpan(const char * name, StringArray & httpHeaders) = 0; - virtual ISpan * createServerSpan(const char * name, const IProperties * httpHeaders) = 0; + virtual ISpan * createServerSpan(const char * name, StringArray & httpHeaders, SpanFlags flags = SpanFlags::None) = 0; + virtual ISpan * createServerSpan(const char * name, const IProperties * httpHeaders, SpanFlags flags = SpanFlags::None) = 0; virtual bool isTracingEnabled() const = 0; + virtual bool alwaysCreateGlobalIds() const = 0; virtual const char * getTracedComponentName() const = 0; }; @@ -220,7 +212,6 @@ extern jlib_decl TraceFlags queryDefaultTraceFlags(); extern jlib_decl TraceFlags loadTraceFlags(const IPropertyTree * globals, const std::initializer_list & y, TraceFlags dft); - // Temporarily modify the trace context and/or flags for the current thread, for the lifetime of the LogContextScope object class jlib_decl LogContextScope diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 696472f604f..6dfc2ee75cd 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -1484,31 +1484,10 @@ CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoaded numChannels = 1; init(); - if (workunit->hasDebugValue("GlobalId")) - { - SCMStringBuffer txId; - workunit->getDebugValue("GlobalId", txId); - if (txId.length()) - { - SocketEndpoint thorEp; - thorEp.setLocalHost(getMachinePortBase()); - logctx->setGlobalId(txId.str(), thorEp, 0); - - SCMStringBuffer callerId; - workunit->getDebugValue("CallerId", callerId); - if (callerId.length()) - logctx->setCallerId(callerId.str()); + Owned traceHeaders = extractTraceDebugOptions(workunit); + Owned requestSpan = queryTraceManager().createServerSpan(workunit->queryWuid(), traceHeaders); + logctx->setActiveSpan(requestSpan); - VStringBuffer msg("GlobalId: %s", txId.str()); - workunit->getDebugValue("CallerId", txId); - if (txId.length()) - msg.append(", CallerId: ").append(txId.str()); - txId.set(logctx->queryLocalId()); - if (txId.length()) - msg.append(", LocalId: ").append(txId.str()); - logctx->CTXLOG("%s", msg.str()); - } - } resumed = WUActionResume == workunit->getAction(); fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT))); querySent = spillsSaved = false; diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index afcc66595aa..5d9dcf3d42d 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -1679,29 +1679,9 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co init(); - if (workUnitInfo->hasProp("Debug/globalid")) - { - const char *globalId = workUnitInfo->queryProp("Debug/globalid"); - if (globalId && *globalId) - { - SocketEndpoint thorEp; - thorEp.setLocalHost(getMachinePortBase()); - - VStringBuffer msg("GlobalId: %s", globalId); - logctx->setGlobalId(globalId, thorEp, 0); - - const char *callerId = workUnitInfo->queryProp("debug/callerid"); - if (callerId && *callerId) - { - msg.append(", CallerId: ").append(callerId); - logctx->setCallerId(callerId); - } - const char *localId = logctx->queryLocalId(); - if (localId && *localId) - msg.append(", LocalId: ").append(localId); - logctx->CTXLOG("%s", msg.str()); - } - } + Owned traceHeaders = deserializeTraceDebugOptions(workUnitInfo->queryPropTree("Debug")); + Owned requestSpan = queryTraceManager().createServerSpan(wuid, traceHeaders); + logctx->setActiveSpan(requestSpan); oldNodeCacheMem = 0; slavemptag = _slavemptag; diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 7176e6856d1..cf864b32ca3 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -610,7 +610,7 @@ constexpr LogMsgCategory MCthorDetailedDebugInfo(MCdebugInfo(thorDetailedLogLeve class CThorContextLogger : public CSimpleInterfaceOf { unsigned traceLevel = 1; - LogTrace logTrace; + Owned activeSpan; mutable CRuntimeStatisticCollection stats; public: @@ -659,25 +659,33 @@ class CThorContextLogger : public CSimpleInterfaceOf { return traceLevel; } - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override + virtual void setActiveSpan(ISpan * span) override { - logTrace.setGlobalId(id); + activeSpan.set(span); } - virtual void setCallerId(const char *id) override + virtual IProperties * getClientHeaders() const override { - logTrace.setCallerId(id); + if (!activeSpan) + return nullptr; + return ::getClientHeaders(activeSpan); } virtual const char *queryGlobalId() const override { - return logTrace.queryGlobalId(); + if (!activeSpan) + return nullptr; + return activeSpan->queryGlobalId(); } virtual const char *queryLocalId() const override { - return logTrace.queryLocalId(); + if (!activeSpan) + return nullptr; + return activeSpan->queryLocalId(); } virtual const char *queryCallerId() const override { - return logTrace.queryCallerId(); + if (!activeSpan) + return nullptr; + return activeSpan->queryCallerId(); } virtual const CRuntimeStatisticCollection &queryStats() const override {