Skip to content

Commit

Permalink
HPCC-32248 Add tracing to rowservice
Browse files Browse the repository at this point in the history
- Added opentelemetry tracing to rowservice

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Nov 22, 2024
1 parent ab280fe commit 5426202
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 6 deletions.
30 changes: 28 additions & 2 deletions fs/dafilesrv/dafilesrv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,31 @@ version: 1.0
detail: 100
)!!";

IPropertyTree * loadConfigurationWithGlobalDefault(const char * defaultYaml, Owned<IPropertyTree>& globalConfig, const char * * argv, const char * componentTag, const char * envPrefix)
{
Owned<IPropertyTree> componentDefault;
if (defaultYaml)
{
Owned<IPropertyTree> defaultConfig = createPTreeFromYAMLString(defaultYaml, 0, ptr_ignoreWhiteSpace, nullptr);
componentDefault.set(defaultConfig->queryPropTree(componentTag));
if (!componentDefault)
throw makeStringExceptionV(99, "Default configuration does not contain the tag %s", componentTag);
}
else
componentDefault.setown(createPTree(componentTag));

mergePTree(componentDefault, globalConfig);

return loadConfiguration(componentDefault, argv, componentTag, envPrefix, nullptr, nullptr);
}

int main(int argc, const char* argv[])
{
InitModuleObjects();

EnableSEHtoExceptionMapping();
#ifndef __64BIT__

// Restrict stack sizes on 32-bit systems
Thread::setDefaultStackSize(0x10000); // 64K stack (also set in windows DSP)
#endif
Expand All @@ -386,7 +404,16 @@ int main(int argc, const char* argv[])
StringBuffer componentName;

// NB: bare-metal dafilesrv does not have a component specific xml
Owned<IPropertyTree> config = loadConfiguration(defaultYaml, argv, "dafilesrv", "DAFILESRV", nullptr, nullptr);
Owned<IPropertyTree> extractedGlobalConfig = createPTree("dafilesrv");

#ifndef _CONTAINERIZED
Owned<IPropertyTree> env = getHPCCEnvironment();
IPropertyTree* globalTracing= env->queryPropTree("Software/tracing");
extractedGlobalConfig->addPropTree("tracing", globalTracing);
#endif

// NB: bare-metal dafilesrv does not have a component specific xml, extracting relevant global configuration instead
Owned<IPropertyTree> config = loadConfigurationWithGlobalDefault(defaultYaml, extractedGlobalConfig, argv, "dafilesrv", "DAFILESRV");

Owned<IPropertyTree> keyPairInfo; // NB: not used in containerized mode
// Get SSL Settings
Expand Down Expand Up @@ -513,7 +540,6 @@ int main(int argc, const char* argv[])

IPropertyTree *dafileSrvInstance = nullptr;
#ifndef _CONTAINERIZED
Owned<IPropertyTree> env = getHPCCEnvironment();
Owned<IPropertyTree> _dafileSrvInstance;
if (env)
{
Expand Down
140 changes: 136 additions & 4 deletions fs/dafsserver/dafsserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,69 @@ static ISecureSocket *createSecureSocket(ISocket *sock, bool disableClientCertVe
}
#endif

//------------------------------------------------------------------------------
// ActiveSpanScope Design Notes:
//------------------------------------------------------------------------------
// ActiveSpanScope updates the threadActiveSpan when it is intstantiated
// and restores it to a user configurable previous ISpan when it leaves scope.
// ActiveSpanScope does not control its referenced ISpan's lifetime or ending.
//
// This design allows ActiveSpanScope to be used to update threadActiveSpan
// for long running ISpans that are time sliced, worked on from multiple threads,
// and / or passed between threads. In these cases multiple ActiveSpanScopes
// will be created over the lifetime of the referenced of the ISpan to represent
// a slice of work done towards that ISpan.
//
// Allowing the previous / restored ISpan to be configured allows for
// "disconnected" work on ISpans to be done. Where the previously active ISpan
// may not be the correct ISpan to restore when an ActiveSpanScope leaves scope.
//
// When an ActiveSpanScope is destroyed it will return the prevSpan to active,
// if and only if its span is still the threadActiveSpan. If this is not this
// implies that there is a conflicting ActiveSpanScope and that a code structure
// issue exists that needs to be addressed. A IERRLOG message will be added
// in debug builds for these cases.
//------------------------------------------------------------------------------

class ActiveSpanScope
{
public:
// Captures current threadActiveSpan for prevSpan
ActiveSpanScope(ISpan * _ptr) : ActiveSpanScope(_ptr, queryThreadedActiveSpan()) {}
ActiveSpanScope(ISpan * _ptr, ISpan * _prev) : span(_ptr), prevSpan(_prev)
{
setThreadedActiveSpan(_ptr);
}
ActiveSpanScope(const ActiveSpanScope& rhs) = delete;

~ActiveSpanScope()
{
ISpan* current = queryThreadedActiveSpan();
if (current != span)
{
const char* currSpanID = current != nullptr ? current->querySpanId() : "null";
const char* expectedSpanID = span != nullptr ? span->querySpanId() : "null";

IERRLOG("~ActiveSpanScope: threadActiveSpan has changed unexpectedly, expected: %s actual: %s", expectedSpanID, currSpanID);
return;
}

setThreadedActiveSpan(prevSpan);
}

inline ISpan * operator -> () const { return span; }
inline operator ISpan *() const { return span; }

inline ActiveSpanScope& operator=(ISpan * ptr) = delete;
inline ActiveSpanScope& operator=(const ActiveSpanScope& rhs) = delete;

inline bool operator == (ISpan * _ptr) const { return span == _ptr; }
inline bool operator != (ISpan * _ptr) const { return span != _ptr; }
private:
ISpan * span = nullptr;
ISpan * prevSpan = nullptr;
};

static void reportFailedSecureAccepts(const char *context, IException *exception, unsigned &numFailedConn, unsigned &timeLastLog)
{
numFailedConn++;
Expand Down Expand Up @@ -839,6 +902,9 @@ class CRemoteRequest : public CSimpleInterfaceOf<IInterface>
MemoryBuffer expandMb;
Owned<IXmlWriterExt> responseWriter; // for xml or json response

Owned<ISpan> requestSpan;
std::string requestTraceParent;

bool handleFull(MemoryBuffer &inMb, size32_t inPos, MemoryBuffer &compressMb, ICompressor *compressor, size32_t replyLimit, size32_t &totalSz)
{
size32_t sz = inMb.length()-inPos;
Expand Down Expand Up @@ -1092,13 +1158,54 @@ class CRemoteRequest : public CSimpleInterfaceOf<IInterface>
responseWriter->outputUInt(cursorHandle, sizeof(cursorHandle), "handle");
}
}

~CRemoteRequest()
{
if (requestSpan != nullptr)
{
requestSpan->setSpanStatusSuccess(true);
requestSpan->endSpan();
}
}

OutputFormat queryFormat() const { return format; }
unsigned __int64 queryReplyLimit() const { return replyLimit; }
IRemoteActivity *queryActivity() const { return activity; }
ICompressor *queryCompressor() const { return compressor; }

void process(IPropertyTree *requestTree, MemoryBuffer &restMb, MemoryBuffer &responseMb, CClientStats &stats)
{
const char* fullTraceContext = requestTree->queryProp("_trace/traceparent");

// We only want to compare the trace-id & span-id, so remove the last "sampling" group
std::string traceParent = fullTraceContext ? fullTraceContext : "";
traceParent = traceParent.substr(0,traceParent.find_last_of("-"));

if (!traceParent.empty() && requestTraceParent != traceParent)
{
// Check to see if we have an existing span that needs to be closed out, this can happen
// when the span parent changes on the client side
if (requestSpan != nullptr)
{
requestSpan->setSpanStatusSuccess(true);
requestSpan->endSpan();
}

Owned<IProperties> traceHeaders = createProperties();
traceHeaders->setProp("traceparent", fullTraceContext);

std::string requestSpanName;
if (activity->queryIsReadActivity())
requestSpanName = "ReadRequest";
else
requestSpanName = "WriteRequest";

requestSpan.set(queryTraceManager().createServerSpan(requestSpanName.c_str(), traceHeaders));
requestTraceParent = traceParent;
}

ActiveSpanScope activeSpan(requestSpan.get());

if (requestTree->hasProp("replyLimit"))
replyLimit = requestTree->getPropInt64("replyLimit", defaultDaFSReplyLimitKB) * 1024;

Expand Down Expand Up @@ -3027,12 +3134,12 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
else
{
if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
break; // wait for rest via subsequent notifySelected's
}
}
else if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
// to be here, implies handled full message, loop around to see if more on the wire.
// will break out if nothing/partial.
}
Expand Down Expand Up @@ -4818,7 +4925,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
* }
* }
* }
*
*
* fetch continuation:
* {
* "format" : "binary",
Expand Down Expand Up @@ -4960,8 +5067,23 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
}
case StreamCmd::CLOSE:
{
OwnedSpanScope closeSpan;
const char* traceParent = requestTree->queryProp("_trace/traceparent");
if (traceParent != nullptr)
{
Owned<IProperties> traceHeaders = createProperties();
traceHeaders->setProp("traceparent", traceParent);

closeSpan.set(queryTraceManager().createServerSpan("CloseRequest", traceHeaders));
}

if (0 == cursorHandle)
throw createDafsException(DAFSERR_cmdstream_protocol_failure, "cursor handle not supplied to 'close' command");
{
IException* exception = createDafsException(DAFSERR_cmdstream_protocol_failure, "cursor handle not supplied to 'close' command");
closeSpan->recordException(exception);
throw exception;
}

IFileIO *dummy;
checkFileIOHandle(cursorHandle, dummy, true);
break;
Expand Down Expand Up @@ -4990,6 +5112,16 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
{
case StreamCmd::VERSION:
{
OwnedSpanScope versionSpan;
const char* traceParent = requestTree->queryProp("_trace/traceparent");
if (traceParent != nullptr)
{
Owned<IProperties> traceHeaders = createProperties();
traceHeaders->setProp("traceparent", traceParent);

versionSpan = queryTraceManager().createServerSpan("VersionRequest", traceHeaders);
}

if (outFmt_Binary == outputFormat)
reply.append(DAFILESRV_VERSIONSTRING);
else
Expand Down

0 comments on commit 5426202

Please sign in to comment.