From deb82709519faaef84c00a691cce7744799a8991 Mon Sep 17 00:00:00 2001 From: James McMullan Date: Mon, 25 Nov 2024 15:17:23 -0500 Subject: [PATCH] HPCC-32248 Add tracing to rowservice - Added opentelemetry tracing to rowservice Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- fs/dafilesrv/dafilesrv.cpp | 31 +++++++- fs/dafsserver/dafsserver.cpp | 140 ++++++++++++++++++++++++++++++++++- 2 files changed, 165 insertions(+), 6 deletions(-) diff --git a/fs/dafilesrv/dafilesrv.cpp b/fs/dafilesrv/dafilesrv.cpp index ffe2e689aea..5348917fd04 100644 --- a/fs/dafilesrv/dafilesrv.cpp +++ b/fs/dafilesrv/dafilesrv.cpp @@ -366,6 +366,23 @@ version: 1.0 detail: 100 )!!"; +IPropertyTree * loadConfigurationWithGlobalDefault(const char * defaultYaml, Owned& globalConfig, const char * * argv, const char * componentTag, const char * envPrefix) +{ + Owned componentDefault; + if (defaultYaml) + { + Owned defaultConfig = createPTreeFromYAMLString(defaultYaml, 0, ptr_ignoreWhiteSpace, nullptr); + componentDefault.set(defaultConfig->queryPropTree(componentTag)); + if (!componentDefault) + throw makeStringExceptionV(99, "Default configuration does not contain the tag %s", componentTag); + } + else + componentDefault.setown(createPTree(componentTag)); + + mergePTree(componentDefault, globalConfig); + + return loadConfiguration(componentDefault, argv, componentTag, envPrefix, nullptr, nullptr); +} int main(int argc, const char* argv[]) { @@ -373,6 +390,7 @@ int main(int argc, const char* argv[]) EnableSEHtoExceptionMapping(); #ifndef __64BIT__ + // Restrict stack sizes on 32-bit systems Thread::setDefaultStackSize(0x10000); // 64K stack (also set in windows DSP) #endif @@ -386,7 +404,17 @@ int main(int argc, const char* argv[]) StringBuffer componentName; // NB: bare-metal dafilesrv does not have a component specific xml - Owned config = loadConfiguration(defaultYaml, argv, "dafilesrv", "DAFILESRV", nullptr, nullptr); + Owned extractedGlobalConfig = createPTree("dafilesrv"); + +#ifndef _CONTAINERIZED + Owned env = getHPCCEnvironment(); + IPropertyTree* globalTracing = env->queryPropTree("Software/tracing"); + if (globalTracing != nullptr) + extractedGlobalConfig->addPropTree("tracing", globalTracing); +#endif + + // NB: bare-metal dafilesrv does not have a component specific xml, extracting relevant global configuration instead + Owned config = loadConfigurationWithGlobalDefault(defaultYaml, extractedGlobalConfig, argv, "dafilesrv", "DAFILESRV"); Owned keyPairInfo; // NB: not used in containerized mode // Get SSL Settings @@ -513,7 +541,6 @@ int main(int argc, const char* argv[]) IPropertyTree *dafileSrvInstance = nullptr; #ifndef _CONTAINERIZED - Owned env = getHPCCEnvironment(); Owned _dafileSrvInstance; if (env) { diff --git a/fs/dafsserver/dafsserver.cpp b/fs/dafsserver/dafsserver.cpp index bfd312ea814..9b322c42e46 100644 --- a/fs/dafsserver/dafsserver.cpp +++ b/fs/dafsserver/dafsserver.cpp @@ -162,6 +162,69 @@ static ISecureSocket *createSecureSocket(ISocket *sock, bool disableClientCertVe } #endif +//------------------------------------------------------------------------------ +// ActiveSpanScope Design Notes: +//------------------------------------------------------------------------------ +// ActiveSpanScope updates the threadActiveSpan when it is intstantiated +// and restores it to a user configurable previous ISpan when it leaves scope. +// ActiveSpanScope does not control its referenced ISpan's lifetime or ending. +// +// This design allows ActiveSpanScope to be used to update threadActiveSpan +// for long running ISpans that are time sliced, worked on from multiple threads, +// and / or passed between threads. In these cases multiple ActiveSpanScopes +// will be created over the lifetime of the referenced of the ISpan to represent +// a slice of work done towards that ISpan. +// +// Allowing the previous / restored ISpan to be configured allows for +// "disconnected" work on ISpans to be done. Where the previously active ISpan +// may not be the correct ISpan to restore when an ActiveSpanScope leaves scope. +// +// When an ActiveSpanScope is destroyed it will return the prevSpan to active, +// if and only if its span is still the threadActiveSpan. If this is not this +// implies that there is a conflicting ActiveSpanScope and that a code structure +// issue exists that needs to be addressed. A IERRLOG message will be added +// in debug builds for these cases. +//------------------------------------------------------------------------------ + +class ActiveSpanScope +{ +public: + // Captures current threadActiveSpan for prevSpan + ActiveSpanScope(ISpan * _ptr) : ActiveSpanScope(_ptr, queryThreadedActiveSpan()) {} + ActiveSpanScope(ISpan * _ptr, ISpan * _prev) : span(_ptr), prevSpan(_prev) + { + setThreadedActiveSpan(_ptr); + } + ActiveSpanScope(const ActiveSpanScope& rhs) = delete; + + ~ActiveSpanScope() + { + ISpan* current = queryThreadedActiveSpan(); + if (current != span) + { + const char* currSpanID = current != nullptr ? current->querySpanId() : "null"; + const char* expectedSpanID = span != nullptr ? span->querySpanId() : "null"; + + IERRLOG("~ActiveSpanScope: threadActiveSpan has changed unexpectedly, expected: %s actual: %s", expectedSpanID, currSpanID); + return; + } + + setThreadedActiveSpan(prevSpan); + } + + inline ISpan * operator -> () const { return span; } + inline operator ISpan *() const { return span; } + + inline ActiveSpanScope& operator=(ISpan * ptr) = delete; + inline ActiveSpanScope& operator=(const ActiveSpanScope& rhs) = delete; + + inline bool operator == (ISpan * _ptr) const { return span == _ptr; } + inline bool operator != (ISpan * _ptr) const { return span != _ptr; } +private: + ISpan * span = nullptr; + ISpan * prevSpan = nullptr; +}; + static void reportFailedSecureAccepts(const char *context, IException *exception, unsigned &numFailedConn, unsigned &timeLastLog) { numFailedConn++; @@ -839,6 +902,9 @@ class CRemoteRequest : public CSimpleInterfaceOf MemoryBuffer expandMb; Owned responseWriter; // for xml or json response + Owned requestSpan; + std::string requestTraceParent; + bool handleFull(MemoryBuffer &inMb, size32_t inPos, MemoryBuffer &compressMb, ICompressor *compressor, size32_t replyLimit, size32_t &totalSz) { size32_t sz = inMb.length()-inPos; @@ -1092,6 +1158,16 @@ class CRemoteRequest : public CSimpleInterfaceOf responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle"); } } + + ~CRemoteRequest() + { + if (requestSpan != nullptr) + { + requestSpan->setSpanStatusSuccess(true); + requestSpan->endSpan(); + } + } + OutputFormat queryFormat() const { return format; } unsigned __int64 queryReplyLimit() const { return replyLimit; } IRemoteActivity *queryActivity() const { return activity; } @@ -1099,6 +1175,37 @@ class CRemoteRequest : public CSimpleInterfaceOf void process(IPropertyTree *requestTree, MemoryBuffer &restMb, MemoryBuffer &responseMb, CClientStats &stats) { + const char* fullTraceContext = requestTree->queryProp("_trace/traceparent"); + + // We only want to compare the trace-id & span-id, so remove the last "sampling" group + std::string traceParent = fullTraceContext ? fullTraceContext : ""; + traceParent = traceParent.substr(0,traceParent.find_last_of("-")); + + if (!traceParent.empty() && requestTraceParent != traceParent) + { + // Check to see if we have an existing span that needs to be closed out, this can happen + // when the span parent changes on the client side + if (requestSpan != nullptr) + { + requestSpan->setSpanStatusSuccess(true); + requestSpan->endSpan(); + } + + Owned traceHeaders = createProperties(); + traceHeaders->setProp("traceparent", fullTraceContext); + + std::string requestSpanName; + if (activity->queryIsReadActivity()) + requestSpanName = "ReadRequest"; + else + requestSpanName = "WriteRequest"; + + requestSpan.set(queryTraceManager().createServerSpan(requestSpanName.c_str(), traceHeaders)); + requestTraceParent = traceParent; + } + + ActiveSpanScope activeSpan(requestSpan.get()); + if (requestTree->hasProp("replyLimit")) replyLimit = requestTree->getPropInt64("replyLimit", defaultDaFSReplyLimitKB) * 1024; @@ -3027,12 +3134,12 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface else { if (gc) - THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); + THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); break; // wait for rest via subsequent notifySelected's } } else if (gc) - THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); + THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); // to be here, implies handled full message, loop around to see if more on the wire. // will break out if nothing/partial. } @@ -4818,7 +4925,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface * } * } * } - * + * * fetch continuation: * { * "format" : "binary", @@ -4960,8 +5067,23 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface } case StreamCmd::CLOSE: { + OwnedSpanScope closeSpan; + const char* traceParent = requestTree->queryProp("_trace/traceparent"); + if (traceParent != nullptr) + { + Owned traceHeaders = createProperties(); + traceHeaders->setProp("traceparent", traceParent); + + closeSpan.set(queryTraceManager().createServerSpan("CloseRequest", traceHeaders)); + } + if (0 == cursorHandle) - throw createDafsException(DAFSERR_cmdstream_protocol_failure, "cursor handle not supplied to 'close' command"); + { + IException* exception = createDafsException(DAFSERR_cmdstream_protocol_failure, "cursor handle not supplied to 'close' command"); + closeSpan->recordException(exception); + throw exception; + } + IFileIO *dummy; checkFileIOHandle(cursorHandle, dummy, true); break; @@ -4990,6 +5112,16 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface { case StreamCmd::VERSION: { + OwnedSpanScope versionSpan; + const char* traceParent = requestTree->queryProp("_trace/traceparent"); + if (traceParent != nullptr) + { + Owned traceHeaders = createProperties(); + traceHeaders->setProp("traceparent", traceParent); + + versionSpan = queryTraceManager().createServerSpan("VersionRequest", traceHeaders); + } + if (outFmt_Binary == outputFormat) reply.append(DAFILESRV_VERSIONSTRING); else