Skip to content

Commit

Permalink
HPCC-30350 Add open telemetry support to roxie/hthor/thor
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Sep 28, 2023
1 parent 7800de0 commit 12afc8f
Show file tree
Hide file tree
Showing 20 changed files with 341 additions and 283 deletions.
1 change: 1 addition & 0 deletions common/thorhelper/roxiehelper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class THORHELPER_API HttpHelper : public CInterface
return getContentTypeMlFormat();
}
IProperties *queryUrlParameters(){return parameters;}
const IProperties * queryRequestHeaders() const { return reqHeaders; }
bool validateHttpGetTarget(const char *target)
{
if (!target)
Expand Down
19 changes: 13 additions & 6 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1911,13 +1911,20 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (!httpHeaderBlockContainsHeader(httpheaders, ACCEPT_ENCODING))
request.appendf("%s: gzip, deflate\r\n", ACCEPT_ENCODING);
#endif
if (!isEmptyString(master->logctx.queryGlobalId()))
Owned<IProperties> traceHeaders = master->logctx.getClientHeaders();
if (traceHeaders)
{
if (!httpHeaderBlockContainsHeader(httpheaders, kGlobalIdHttpHeaderName))
request.append(kGlobalIdHttpHeaderName).append(": ").append(master->logctx.queryGlobalId()).append("\r\n");

if (!isEmptyString(master->logctx.queryLocalId()) && !httpHeaderBlockContainsHeader(httpheaders, kCallerIdHttpHeaderName))
request.append(kCallerIdHttpHeaderName).append(": ").append(master->logctx.queryLocalId()).append("\r\n"); //our localId is reciever's callerId
Owned<IPropertyIterator> iter = traceHeaders->getIterator();
ForEach(*iter)
{
const char * key = iter->getPropKey();
if (!httpHeaderBlockContainsHeader(httpheaders, key))
{
const char * value = traceHeaders->queryProp(key);
if (!isEmptyString(value))
request.append(key).append(": ").append(value).append("\r\n");
}
}
}

if (master->wscType == STsoap)
Expand Down
58 changes: 58 additions & 0 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14634,3 +14634,61 @@ bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const
throwUnexpected();
}
#endif


//The names of the debug options used to serialize trace info - lower case to ensure they also work on the property tree (in thor slaves)
static constexpr const char * traceDebugOptions[] = { "globalid", "callerid", "ottraceparent", "ottracestate" };
//The names of the headers containing the trace info
static constexpr const char * traceHeaderNames[] = { "global-id", "caller-id", "traceparent", "tracestate" };
static_assert(_elements_in(traceDebugOptions) == _elements_in(traceHeaderNames), "Inconsistent tracePropertyNames, traceHeaderNames arrays");

IProperties * extractTraceDebugOptions(IConstWorkUnit * source)
{
Owned<IProperties> target = createProperties(true);
SCMStringBuffer temp;
for (unsigned i=0; i < _elements_in(traceDebugOptions); i++)
{
const char * debugOption = traceDebugOptions[i];
const char * headerName = traceHeaderNames[i];
if (source->hasDebugValue(debugOption))
{
temp.clear();
source->getDebugValue(debugOption, temp);
target->setProp(headerName, temp.str());
}
}
return target.getClear();
}

IProperties * deserializeTraceDebugOptions(const IPropertyTree * debugOptions)
{
Owned<IProperties> target = createProperties(true);
if (debugOptions)
{
for (unsigned i=0; i < _elements_in(traceDebugOptions); i++)
{
const char * debugOption = traceDebugOptions[i];
const char * headerName = traceHeaderNames[i];
if (debugOptions->hasProp(debugOption))
{
const char * value = debugOptions->queryProp(debugOption);
target->setProp(headerName, value);
}
}
}
return target.getClear();
}

void recordTraceDebugOptions(IWorkUnit * target, const IProperties * source)
{
for (unsigned i=0; i < _elements_in(traceDebugOptions); i++)
{
const char * headerName = traceHeaderNames[i];
const char * debugOption = traceDebugOptions[i];
if (source->hasProp(headerName))
{
const char * value = source->queryProp(headerName);
target->setDebugValue(debugOption, value, true);
}
}
}
4 changes: 4 additions & 0 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,10 @@ class WORKUNIT_API WorkUnitErrorReceiver : implements IErrorReceiver, public CIn
bool removeTimeStamp;
};

extern WORKUNIT_API IProperties * extractTraceDebugOptions(IConstWorkUnit * source);
extern WORKUNIT_API IProperties * deserializeTraceDebugOptions(const IPropertyTree * debugOptions);
extern WORKUNIT_API void recordTraceDebugOptions(IWorkUnit * target, const IProperties * source);

extern WORKUNIT_API void addWorkunitException(IWorkUnit * wu, IError * error, bool removeTimeStamp);

inline bool isGlobalScope(const char * scope) { return scope && (streq(scope, GLOBAL_SCOPE) || streq(scope, LEGACY_GLOBAL_SCOPE)); }
Expand Down
27 changes: 3 additions & 24 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2130,30 +2130,9 @@ void EclAgent::runProcess(IEclProcess *process)
assertex(rowManager==NULL);
allocatorMetaCache.setown(createRowAllocatorCache(this));

if (queryWorkUnit()->hasDebugValue("GlobalId"))
{
SCMStringBuffer globalId;
queryWorkUnit()->getDebugValue("GlobalId", globalId);
if (globalId.length())
{
SocketEndpoint thorEp;
thorEp.setLocalHost(0);
updateDummyContextLogger().setGlobalId(globalId.str(), thorEp, GetCurrentProcessId());

VStringBuffer msg("GlobalId: %s", globalId.str());
SCMStringBuffer txId;
queryWorkUnit()->getDebugValue("CallerId", txId);
if (txId.length())
{
updateDummyContextLogger().setCallerId(txId.str());
msg.append(", CallerId: ").append(txId.str());
}
txId.set(updateDummyContextLogger().queryLocalId());
if (txId.length())
msg.append(", LocalId: ").append(txId.str());
updateDummyContextLogger().CTXLOG("%s", msg.str());
}
}
Owned<IProperties> traceHeaders = extractTraceDebugOptions(queryWorkUnit());
Owned<ISpan> requestSpan = queryTraceManager().createServerSpan(queryWorkUnit()->queryWuid(), traceHeaders);
updateDummyContextLogger().setActiveSpan(requestSpan);

// a component may specify an alternate name for the agent/workflow memory area,
// e.g. Thor specifies in "eclAgentMemory"
Expand Down
24 changes: 16 additions & 8 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface
mutable bool aborted;
mutable CIArrayOf<LogItem> log;
private:
LogTrace logTrace;
Owned<ISpan> activeSpan;
ContextLogger(const ContextLogger &); // Disable copy constructor
public:
IMPLEMENT_IINTERFACE;
Expand Down Expand Up @@ -742,25 +742,33 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface
{
stats.reset();
}
virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override
virtual void setActiveSpan(ISpan * span) override
{
logTrace.setGlobalId(id);
activeSpan.set(span);
}
virtual void setCallerId(const char *id) override
virtual IProperties * getClientHeaders() const override
{
logTrace.setCallerId(id);
if (!activeSpan)
return nullptr;
return ::getClientHeaders(activeSpan);
}
virtual const char *queryGlobalId() const override
{
return logTrace.queryGlobalId();
if (!activeSpan)
return nullptr;
return activeSpan->queryGlobalId();
}
virtual const char *queryCallerId() const override
{
return logTrace.queryCallerId();
if (!activeSpan)
return nullptr;
return activeSpan->queryCallerId();
}
virtual const char *queryLocalId() const override
{
return logTrace.queryLocalId();
if (!activeSpan)
return nullptr;
return activeSpan->queryLocalId();
}
virtual const CRuntimeStatisticCollection & queryStats() const override
{
Expand Down
8 changes: 4 additions & 4 deletions roxie/ccd/ccdcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1379,13 +1379,13 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext
{
return logctx.queryTraceLevel();
}
virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override
virtual void setActiveSpan(ISpan * span) override
{
const_cast<IRoxieContextLogger&>(logctx).setGlobalId(id, ep, pid);
const_cast<IRoxieContextLogger&>(logctx).setActiveSpan(span);
}
virtual void setCallerId(const char *id) override
virtual IProperties * getClientHeaders() const override
{
const_cast<IRoxieContextLogger&>(logctx).setCallerId(id);
return logctx.getClientHeaders();
}
virtual const char *queryGlobalId() const override
{
Expand Down
86 changes: 37 additions & 49 deletions roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1177,17 +1177,11 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
wu.setown(daliHelper->attachWorkunit(wuid.get()));
}
Owned<StringContextLogger> logctx = new StringContextLogger(wuid.get());
if (wu->hasDebugValue("GlobalId"))
{
SCMStringBuffer globalId;
SocketEndpoint ep;
ep.setLocalHost(0);
SCMStringBuffer callerId;
logctx->setGlobalId(wu->getDebugValue("GlobalId", globalId).str(), ep, GetCurrentProcessId());
wu->getDebugValue("CallerId", callerId);
if (callerId.length())
logctx->setCallerId(callerId.str());
}

Owned<IProperties> traceHeaders = extractTraceDebugOptions(wu);
Owned<ISpan> requestSpan = queryTraceManager().createServerSpan(wu->queryWuid(), traceHeaders);
logctx->setActiveSpan(requestSpan);

Owned<IQueryFactory> queryFactory;
try
{
Expand Down Expand Up @@ -1308,6 +1302,7 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
StringBuffer s;
logctx.getStats(s);

//MORE: logctx.queryActiveSpan()->getLogPrefix() or similar.
StringBuffer txidInfo;
const char *globalId = logctx.queryGlobalId();
if (globalId && *globalId)
Expand Down Expand Up @@ -1357,7 +1352,6 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
public:
StringAttr queryName;
StringAttr uid = "-";
StringAttr callerId;
Owned<CascadeManager> cascade;
Owned<IDebuggerContext> debuggerContext;
Owned<CDebugCommandHandler> debugCmdHandler;
Expand All @@ -1367,6 +1361,7 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
SocketEndpoint ep;
time_t startTime;
bool notedActive = false;
bool ensureGlobalIdExists = false;
public:
IMPLEMENT_IINTERFACE;

Expand Down Expand Up @@ -1415,9 +1410,7 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
IConstWorkUnit *workunit = queryFactory->queryWorkUnit();
if (workunit && workunit->getDebugValueBool("generateGlobalId", false) && isEmptyString(logctx->queryGlobalId()))
{
StringBuffer gen_id;
appendGloballyUniqueId(gen_id);
setTransactionId(gen_id.str(), nullptr, true);
ensureGlobalIdExists = true;
}
}
virtual void noteQueryActive()
Expand Down Expand Up @@ -1445,44 +1438,38 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
return *cascade;
}

virtual void setTransactionId(const char *id, const char *caller, bool global) override
virtual void startSpan(const char * id, const IProperties * headers) override
{
if (!id || !*id)
return;
uid.set(id);
ensureContextLogger();
if (!global && !isEmptyString(logctx->queryGlobalId())) //globalId wins
return;
StringBuffer s;
ep.getHostText(s).appendf(":%u{%s}", ep.port, uid.str()); //keep no matter what for existing log parsers
if (global)
Linked<const IProperties> allHeaders = headers;
SpanFlags flags = (ensureGlobalIdExists) ? SpanFlags::EnsureGlobalId : SpanFlags::None;
if (headers && !headers->queryProp("global-id"))
{
s.append('[');
logctx->setGlobalId(id, ep, 0);
if (caller && *caller)
//If an id is provided, and we are not automatically creating global ids, use the id as the global-id
if ((id && *id) && !ensureGlobalIdExists)
{
setCallerId(caller);
logctx->setCallerId(caller);
Owned<IProperties> clonedHeaders = cloneProperties(headers, true);
clonedHeaders->setProp("global-id", id);
allHeaders.setown(clonedHeaders.getClear());
}
if (callerId.length())
s.appendf("caller:%s", callerId.str());
}

const char *local = logctx->queryLocalId(); //generated in setGlobalId above
if (local && *local)
{
if (callerId.length())
s.append(',');
s.appendf("local:%s", local);
}
s.append("]");
ensureContextLogger();

Owned<ISpan> requestSpan = queryTraceManager().createServerSpan("request", allHeaders, flags);
logctx->setActiveSpan(requestSpan);

const char * globalId = requestSpan->queryGlobalId();
if (globalId)
id = globalId;

uid.set(id);
if (id)
{
StringBuffer s;
ep.getHostText(s).appendf(":%u{%s}", ep.port, id); //keep no matter what for existing log parsers
requestSpan->getLogPrefix(s);
logctx->set(s.str());
}
logctx->set(s.str());
}
virtual void setCallerId(const char *id)
{
if (!id || !*id)
return;
callerId.set(id);
}
inline IDebuggerContext &ensureDebuggerContext(const char *id)
{
Expand Down Expand Up @@ -1602,9 +1589,10 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
StringBuffer s;
logctx->getStats(s);

const char * callerId = logctx->queryCallerId();
StringBuffer txIds;
if (callerId.length())
txIds.appendf("caller: %s", callerId.str());
if (!isEmptyString(callerId))
txIds.appendf("caller: %s", callerId);
const char *localId = logctx->queryLocalId();
if (localId && *localId)
{
Expand Down
14 changes: 4 additions & 10 deletions roxie/ccd/ccdprotocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1795,13 +1795,6 @@ class RoxieSocketWorker : public ProtocolQueryWorker
{
mlResponseFmt = httpHelper.queryResponseMlFormat();
mlRequestFmt = httpHelper.queryRequestMlFormat();

const char *globalId = queryRequestGlobalIdHeader(httpHelper, logctx);
const char *callerId = queryRequestCallerIdHeader(httpHelper, logctx);
if (globalId && *globalId)
msgctx->setTransactionId(globalId, callerId, true); //logged and forwarded through SOAPCALL/HTTPCALL
else if (callerId && *callerId)
msgctx->setCallerId(callerId); //may not matter, but maintain old behavior
}
}

Expand Down Expand Up @@ -1906,9 +1899,10 @@ class RoxieSocketWorker : public ProtocolQueryWorker

uid = NULL;
sanitizeQuery(queryPT, queryName, sanitizedText, httpHelper, uid, isBlind, isDebug);
if (uid)
msgctx->setTransactionId(uid, nullptr, false);
else

msgctx->startSpan(uid, httpHelper.queryRequestHeaders());

if (!uid)
uid = "-";

sink->checkAccess(peer, queryName, sanitizedText, isBlind);
Expand Down
Loading

0 comments on commit 12afc8f

Please sign in to comment.