diff --git a/common/thorhelper/roxiehelper.ipp b/common/thorhelper/roxiehelper.ipp index 33eaa5f662c..5375bd523fb 100644 --- a/common/thorhelper/roxiehelper.ipp +++ b/common/thorhelper/roxiehelper.ipp @@ -59,6 +59,7 @@ enum TracingCategory class LogItem; interface IRoxieContextLogger : extends IContextLogger { + IRoxieContextLogger(IPropertyTree *cfg) : IContextLogger(cfg) {} // Override base interface with versions that add prefix // We could consider moving some or all of these down into IContextLogger virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0))) @@ -74,7 +75,13 @@ interface IRoxieContextLogger : extends IContextLogger getLogPrefix(prefix); CTXLOGaeva(E, file, line, prefix.str(), format, args); } - + virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const + { + va_list args; + va_start(args, format); + logOperatorExceptionVA(E, file, line, format, args); + va_end(args); + } virtual StringBuffer &getLogPrefix(StringBuffer &ret) const = 0; virtual bool isIntercepted() const = 0; virtual void CTXLOGa(TracingCategory category, const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *prefix, const char *text) const = 0; diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index 49a2ad6668f..2378dc4ae8b 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -18,7 +18,6 @@ #ifndef THORCOMMON_HPP #define THORCOMMON_HPP -#include "jlog.hpp" #include "jiface.hpp" #include "jcrc.hpp" #include "jlzw.hpp" @@ -673,108 +672,6 @@ class THORHELPER_API IndirectCodeContext : implements ICodeContext protected: ICodeContext * ctx; }; -class CThorBaseContextLogger : public CSimpleInterfaceOf -{ -protected: - unsigned traceLevel = 1; - LogTrace logTrace; - mutable CRuntimeStatisticCollection stats; - -public: - CThorBaseContextLogger() : stats(jhtreeCacheStatistics) - { - } - virtual void CTXLOG(const char *format, ...) const override __attribute__((format(printf,2,3))) - { - va_list args; - va_start(args, format); - CTXLOGva(MCdebugProgress, unknownJob, NoLogMsgCode, format, args); - va_end(args); - } - - virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0))) - { - VALOG(cat, job, code, format, args); - } - virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0))) - { - StringBuffer ss; - ss.append("ERROR"); - if (E) - ss.append(": ").append(E->errorCode()); - if (file) - ss.appendf(": %s(%d) ", file, line); - if (E) - E->errorMessage(ss.append(": ")); - if (format) - ss.append(": ").valist_appendf(format, args); - LOG(MCoperatorProgress, unknownJob, "%s", ss.str()); - } - virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const override - { - stats.addStatisticAtomic(kind, value); - } - virtual void setStatistic(StatisticKind kind, unsigned __int64 value) const override - { - stats.setStatistic(kind, value); - } - virtual void mergeStats(const CRuntimeStatisticCollection &from) const override - { - stats.merge(from); - } - virtual unsigned queryTraceLevel() const override - { - return traceLevel; - } - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override - { - logTrace.setGlobalId(id); - } - virtual void setCallerId(const char *id) override - { - logTrace.setCallerId(id); - } - virtual const char *queryGlobalId() const override - { - return logTrace.queryGlobalId(); - } - virtual const char *queryLocalId() const override - { - return logTrace.queryLocalId(); - } - virtual const char *queryCallerId() const override - { - return logTrace.queryCallerId(); - } - virtual void setHttpIdHeaderNames(const char *global, const char *caller) override - { - logTrace.setHttpIdHeaderNames(global, caller); - } - virtual const char *queryGlobalIdHttpHeaderName() const override - { - return logTrace.queryGlobalIdHTTPHeaderName(); - } - virtual const char *queryCallerIdHttpHeaderName() const override - { - return logTrace.queryCallerIdHTTPHeaderName(); - } - virtual const CRuntimeStatisticCollection &queryStats() const override - { - return stats; - } - virtual void recordStatistics(IStatisticGatherer &progress) const override - { - stats.recordStatistics(progress, false); - } - void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous) - { - previous.updateDelta(to, stats); - } - void reset() - { - stats.reset(); - } -}; extern THORHELPER_API bool isActivitySink(ThorActivityKind kind); extern THORHELPER_API bool isActivitySource(ThorActivityKind kind); diff --git a/ecl/eclagent/agentctx.hpp b/ecl/eclagent/agentctx.hpp index a016679fecf..4c0d4f27521 100644 --- a/ecl/eclagent/agentctx.hpp +++ b/ecl/eclagent/agentctx.hpp @@ -124,6 +124,7 @@ struct IAgentContext : extends IGlobalCodeContext virtual bool forceNewDiskReadActivity() const = 0; virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0; virtual double queryAgentMachineCost() const = 0; + virtual IContextLogger & queryContextLogger() = 0; }; #endif // AGENTCTX_HPP_INCL diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 0b0610c3adb..7bfd843c8f0 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -508,7 +508,8 @@ class EclAgentPluginCtx : public SimplePluginCtx //======================================================================================= EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, ILogMsgHandler * _logMsgHandler) - : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler) + : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler), + contextLogger(agentTopology, jhtreeCacheStatistics, unknownJob) { isAborting = false; isStandAloneExe = false; diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 10c86cc6ebb..0519fb603b1 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -249,8 +249,11 @@ public: virtual double queryAgentMachineCost() const override { return ctx->queryAgentMachineCost(); - }; - + } + virtual IContextLogger & queryContextLogger() override + { + return ctx->queryContextLogger(); + } protected: IAgentContext * ctx; }; @@ -392,6 +395,7 @@ private: Owned outputSerializer; int retcode; double agentMachineCost = 0; + CStatsContextLogger contextLogger; private: void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val); @@ -705,6 +709,10 @@ public: { return agentMachineCost; } + virtual IContextLogger & queryContextLogger() + { + return contextLogger; + } }; //--------------------------------------------------------------------------- diff --git a/ecl/hthor/hthor.ipp b/ecl/hthor/hthor.ipp index 9ee40fcdd47..b76e2c1a214 100644 --- a/ecl/hthor/hthor.ipp +++ b/ecl/hthor/hthor.ipp @@ -3239,10 +3239,6 @@ protected: void onLimitExceeded(); }; -// improvement: override constructor to store setHttpIdHeaderNames, override CTXLOG & logOperatorExceptionVA to log LogMsgJobInfo -class CHThorContextLogger : public CThorBaseContextLogger -{ -}; #define MAKEFACTORY(NAME) \ extern HTHOR_API IHThorActivity * create ## NAME ## Activity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThor ## NAME ## Arg &arg, ThorActivityKind kind, EclGraph & _graph) \ diff --git a/ecl/hthor/hthorkey.cpp b/ecl/hthor/hthorkey.cpp index 434d4717d16..1060814b726 100644 --- a/ecl/hthor/hthorkey.cpp +++ b/ecl/hthor/hthorkey.cpp @@ -200,7 +200,7 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase { CHThorActivityBase::updateProgress(progress); StatsActivityScope scope(progress, activityId); - contextLogger.recordStatistics(progress); + agent.queryContextLogger().recordStatistics(progress); progress.addStatistic(StNumPostFiltered, queryPostFiltered()); } @@ -268,7 +268,6 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase unsigned keyIndexCacheIdx = 0; unsigned postFiltered; - CHThorContextLogger contextLogger; bool singlePart = false; // a single part index, not part of a super file - optimize so never reload the part. bool localSortKey = false; bool initializedFileInfo = false; @@ -418,7 +417,7 @@ bool CHThorIndexReadActivityBase::doPreopenLimitFile(unsigned __int64 & count, u { Owned tlk = openKeyFile(df->queryPart(num)); verifyIndex(tlk); - Owned tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &contextLogger, helper.hasNewSegmentMonitors(), false); + Owned tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &agent.queryContextLogger(), helper.hasNewSegmentMonitors(), false); initManager(tlman, true); while(tlman->lookup(false) && (count<=limit)) { @@ -454,7 +453,7 @@ IKeyIndex * CHThorIndexReadActivityBase::doPreopenLimitPart(unsigned __int64 & r verifyIndex(kidx); if (limit != (unsigned) -1) { - Owned kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, &contextLogger, helper.hasNewSegmentMonitors(), false); + Owned kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, &agent.queryContextLogger(), helper.hasNewSegmentMonitors(), false); initManager(kman, false); result += kman->checkCount(limit-result); } @@ -556,7 +555,7 @@ void CHThorIndexReadActivityBase::initManager(IKeyManager *manager, bool isTlk) void CHThorIndexReadActivityBase::initPart() { assertex(!keyIndex->isTopLevelKey()); - klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, &contextLogger, helper.hasNewSegmentMonitors(), false)); + klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, &agent.queryContextLogger(), helper.hasNewSegmentMonitors(), false)); initManager(klManager, false); callback.setManager(klManager, nullptr); } @@ -590,7 +589,7 @@ bool CHThorIndexReadActivityBase::firstMultiPart() if(!tlk) openTlk(); verifyIndex(tlk); - tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &contextLogger, helper.hasNewSegmentMonitors(), false)); + tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &agent.queryContextLogger(), helper.hasNewSegmentMonitors(), false)); initManager(tlManager, true); nextPartNumber = 0; return nextMultiPart(); @@ -3020,11 +3019,10 @@ class KeyedLookupPartHandler : extends ThreadedPartHandler, implements Owned manager; IAgentContext &agent; DistributedKeyLookupHandler * tlk; - CHThorContextLogger &contextLogger; public: IMPLEMENT_IINTERFACE; - KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, CHThorContextLogger & _contextLogger); + KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent); ~KeyedLookupPartHandler() { @@ -3074,7 +3072,6 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep Owned threadPool; IntArray subSizes; IAgentContext &agent; - CHThorContextLogger &contextLogger; void addFile(IDistributedFile &f) { @@ -3085,7 +3082,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep for (unsigned idx = 0; idx < numParts; idx++) { IDistributedFilePart *part = f.getPart(idx); - parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent, contextLogger)); + parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent)); } keyFiles.append(OLINK(f)); tlks.append(*f.getPart(numParts)); @@ -3095,8 +3092,8 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep public: IMPLEMENT_IINTERFACE; - DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, CHThorContextLogger & _contextLogger) - : owner(_owner), file(f), agent(_agent), contextLogger(_contextLogger) + DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent) + : owner(_owner), file(f), agent(_agent) { threadPool.setown(createThreadPool("hthor keyed join lookup thread pool", &threadFactory)); IDistributedSuperFile *super = f->querySuperFile(); @@ -3161,7 +3158,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep //Owned trans.setown(owner.getLayoutTranslator(&f)); owner.verifyIndex(&f, index, trans); - Owned manager = createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false); + Owned manager = createLocalKeyManager(owner.queryIndexRecord(), index, &agent.queryContextLogger(), owner.hasNewSegmentMonitors(), false); managers.append(*manager.getLink()); } opened = true; @@ -3190,8 +3187,8 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep const IDynamicTransform * queryRecordLayoutTranslator() const { return trans; } }; -KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, CHThorContextLogger & _contextLogger) - : ThreadedPartHandler(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk), contextLogger(_contextLogger) +KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent) + : ThreadedPartHandler(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk) { } @@ -3200,7 +3197,7 @@ void KeyedLookupPartHandler::openPart() if(manager) return; Owned index = openKeyFile(*part); - manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false)); + manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &agent.queryContextLogger(), owner.hasNewSegmentMonitors(), false)); const IDynamicTransform * trans = tlk->queryRecordLayoutTranslator(); if(trans && !index->isTopLevelKey()) manager->setLayoutTranslator(trans); @@ -3215,14 +3212,13 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl IJoinProcessor &owner; IAgentContext &agent; bool opened; - CHThorContextLogger &contextLogger; public: IMPLEMENT_IINTERFACE; - MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, CHThorContextLogger & _contextLogger) - : file(f), owner(_owner), agent(_agent), opened(false), contextLogger(_contextLogger) + MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent) + : file(f), owner(_owner), agent(_agent), opened(false) { super = f->querySuperFile(); if (super) @@ -3281,7 +3277,7 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl { Owned index = openKeyFile(f.queryPart(0)); owner.verifyIndex(&f, index, trans); - manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false)); + manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &agent.queryContextLogger(), owner.hasNewSegmentMonitors(), false)); } else { @@ -3294,7 +3290,7 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl parts->addIndex(index.getLink()); } owner.verifyIndex(&f, index, trans); - manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, &contextLogger, owner.hasNewSegmentMonitors(), false)); + manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, &agent.queryContextLogger(), owner.hasNewSegmentMonitors(), false)); } if(trans) manager->setLayoutTranslator(trans); @@ -3401,7 +3397,6 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I Owned translator; RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified; bool isCodeSigned = false; - CHThorContextLogger contextLogger; public: CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) @@ -3965,9 +3960,9 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I mono = useMonolithic(*dFile); } if (mono) - lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent, contextLogger)); + lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent)); else - lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent, contextLogger)); + lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent)); agent.logFileAccess(dFile, "HThor", "READ", graph); } else @@ -3985,7 +3980,6 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I manager->finishSegmentMonitors(); manager->reset(); manager->resetCounts(); - contextLogger.reset(); } virtual void doneManager(IKeyManager * manager) @@ -4061,7 +4055,7 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I progress.addStatistic(StNumPreFiltered, prefiltered); progress.addStatistic(StNumPostFiltered, postfiltered); progress.addStatistic(StNumIndexSkips, skips); - contextLogger.recordStatistics(progress); + agent.queryContextLogger().recordStatistics(progress); } protected: diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 7fa2aaea244..c29e0d6f8fa 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -591,12 +591,11 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface mutable bool aborted; mutable CIArrayOf log; private: - LogTrace logTrace; ContextLogger(const ContextLogger &); // Disable copy constructor public: IMPLEMENT_IINTERFACE; - ContextLogger() : stats(accumulatedStatistics, true) + ContextLogger() : IRoxieContextLogger(topology), stats(accumulatedStatistics, true) { ctxTraceLevel = traceLevel; intercept = false; @@ -604,8 +603,6 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface start = msTick(); channel = 0; aborted = false; - if ( topology && topology->hasProp("@httpGlobalIdHeader")) - setHttpIdHeaderNames(topology->queryProp("@httpGlobalIdHeader"), topology->queryProp("@httpCallerIdHeader")); } void outputXML(IXmlStreamFlusher &out) @@ -737,7 +734,10 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface { merged.merge(stats); } - + virtual void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous) const override + { + previous.updateDelta(to, stats); + } virtual unsigned queryTraceLevel() const { return ctxTraceLevel; @@ -746,38 +746,6 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface { stats.reset(); } - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override - { - logTrace.setGlobalId(id); - } - virtual void setCallerId(const char *id) override - { - logTrace.setCallerId(id); - } - virtual const char *queryGlobalId() const override - { - return logTrace.queryGlobalId(); - } - virtual const char *queryCallerId() const override - { - return logTrace.queryCallerId(); - } - virtual const char *queryLocalId() const override - { - return logTrace.queryLocalId(); - } - virtual void setHttpIdHeaderNames(const char *global, const char *caller) override - { - logTrace.setHttpIdHeaderNames(global, caller); - } - virtual const char *queryGlobalIdHttpHeaderName() const override - { - return logTrace.queryGlobalIdHTTPHeaderName(); - } - virtual const char *queryCallerIdHttpHeaderName() const override - { - return logTrace.queryCallerIdHTTPHeaderName(); - } virtual const CRuntimeStatisticCollection & queryStats() const override { return stats; diff --git a/roxie/ccd/ccdcontext.cpp b/roxie/ccd/ccdcontext.cpp index 0cdef81b791..38b7423fa30 100644 --- a/roxie/ccd/ccdcontext.cpp +++ b/roxie/ccd/ccdcontext.cpp @@ -1179,7 +1179,7 @@ class InlineXmlDataReader : public WorkUnitRowReaderBase //--------------------------------------------------------------------------------------- -static const StatisticsMapping roxieGraphStatistics({}); +static const StatisticsMapping graphStatistics({}); class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext, implements roxiemem::ITimeLimiter, implements IRowAllocatorMetaActIdCacheCallback, public CInterface { protected: @@ -1266,7 +1266,7 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext public: IMPLEMENT_IINTERFACE; CRoxieContextBase(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx) - : factory(_factory), options(factory->queryOptions()), logctx(_logctx), globalStats(roxieGraphStatistics) + : IRoxieAgentContext(topology), factory(_factory), options(factory->queryOptions()), logctx(_logctx), globalStats(graphStatistics) { startTime = lastWuAbortCheck = msTick(); persists = NULL; @@ -1332,6 +1332,10 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext { globalStats.recordStatistics(progress, false); } + virtual void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous) const override + { + previous.updateDelta(to, globalStats); + } virtual void CTXLOGa(TracingCategory category, const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *prefix, const char *text) const override { logctx.CTXLOGa(category, cat, job, code, prefix, text); diff --git a/roxie/ccd/ccdcontext.hpp b/roxie/ccd/ccdcontext.hpp index 9ce90728f71..8365dbbf7a0 100644 --- a/roxie/ccd/ccdcontext.hpp +++ b/roxie/ccd/ccdcontext.hpp @@ -45,6 +45,7 @@ interface IRoxieServerContext; interface IRoxieAgentContext : extends IRoxieContextLogger { + IRoxieAgentContext(IPropertyTree *cfg) : IRoxieContextLogger(cfg) {} virtual ICodeContext *queryCodeContext() = 0; virtual void checkAbort() = 0; virtual void notifyAbort(IException *E) = 0; diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 8f73b8b2a6f..474cf4041f4 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -171,7 +171,7 @@ class RestartableThread : public CInterface class IndirectAgentContext : implements IRoxieAgentContext, public CInterface { public: - IndirectAgentContext(IRoxieAgentContext * _ctx) : ctx(_ctx) {} + IndirectAgentContext(IRoxieAgentContext * _ctx) : IRoxieAgentContext(nullptr), ctx(_ctx) {} IMPLEMENT_IINTERFACE // void set(IRoxieAgentContext * _ctx) { ctx = _ctx; } @@ -236,6 +236,10 @@ class IndirectAgentContext : implements IRoxieAgentContext, public CInterface { ctx->recordStatistics(progress); } + virtual void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous) const override + { + ctx->updateStatsDeltaTo(to, previous); + } virtual bool collectingDetailedStatistics() const { return ctx->collectingDetailedStatistics(); @@ -1086,7 +1090,7 @@ class CRoxieServerActivity : implements CInterfaceOf, impl IMPLEMENT_IINTERFACE_USING(CInterfaceOf) CRoxieServerActivity(IRoxieAgentContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager) - : basehelper(_factory->getHelper()), + : IRoxieContextLogger(topology), basehelper(_factory->getHelper()), ctx(_ctx), interceptedCtx(*this), factory(_factory), @@ -1111,7 +1115,7 @@ class CRoxieServerActivity : implements CInterfaceOf, impl } CRoxieServerActivity(IRoxieAgentContext *_ctx, IHThorArg & _helper) - : basehelper(_helper), ctx(_ctx), + : IRoxieContextLogger(topology), basehelper(_helper), ctx(_ctx), interceptedCtx(*this), factory(NULL), stats(allStatistics) { activityId = 0; @@ -1236,6 +1240,11 @@ class CRoxieServerActivity : implements CInterfaceOf, impl CriticalBlock b(statscrit); return stats.toStr(ret); } + virtual void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous) const override + { + CriticalBlock b(statscrit); + previous.updateDelta(to, stats); + } virtual ISectionTimer *registerTimer(unsigned _activityId, const char * name) { CriticalBlock b(statscrit); // reuse statscrit to protect functionTimers - it will not be held concurrently diff --git a/system/jlib/jlog.cpp b/system/jlib/jlog.cpp index 5aeffbf7c7b..d8c090d4759 100644 --- a/system/jlib/jlog.cpp +++ b/system/jlib/jlog.cpp @@ -2770,16 +2770,17 @@ void SysLogMsgHandler::addToPTree(IPropertyTree * tree) const tree->addPropTree("handler", handlerTree); } -// Default implementations of the functions in IContextLogger interface - void IContextLogger::CTXLOG(const char *format, ...) const { va_list args; va_start(args, format); - CTXLOGva(MCdebugInfo, unknownJob, NoLogMsgCode, format, args); + CTXLOGva(MCdebugInfo, logMsgJobInfo, NoLogMsgCode, format, args); va_end(args); } - +void IContextLogger::CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const +{ + VALOG(cat, job, code, format, args); +} void IContextLogger::mCTXLOG(const char *format, ...) const { va_list args; @@ -2812,7 +2813,6 @@ void IContextLogger::mCTXLOG(const char *format, ...) const ++cursor; } } - void IContextLogger::logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const { va_list args; @@ -2820,23 +2820,37 @@ void IContextLogger::logOperatorException(IException *E, const char *file, unsig logOperatorExceptionVA(E, file, line, format, args); va_end(args); } +void IContextLogger::logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const +{ + StringBuffer ss; + ss.append("ERROR"); + if (E) + ss.append(": ").append(E->errorCode()); + if (file) + ss.appendf(": %s(%d) ", file, line); + if (E) + E->errorMessage(ss.append(": ")); + if (format) + ss.append(": ").valist_appendf(format, args); + LOG(MCoperatorProgress, logMsgJobInfo, "%s", ss.str()); +} class CRuntimeStatisticCollection; -class DummyLogCtx : implements IContextLogger +class DummyLogCtx : implements IContextLogger, public CInterface { -private: - LogTrace logTrace; public: - DummyLogCtx() {} + DummyLogCtx(): IContextLogger(nullptr) {} // It's a static object - we don't want to actually link-count it... virtual void Link() const {} virtual bool Release() const { return false; } - + virtual void CTXLOG(const char *format, ...) const __attribute__((format(printf, 2, 3))) {} + virtual void mCTXLOG(const char *format, ...) const __attribute__((format(printf, 2, 3))) {} + void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const __attribute__((format(printf, 5, 6))) {} virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0))) { VALOG(cat, job, code, format, args); } - virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0))) + virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const override __attribute__((format(printf,5,0))) { StringBuffer ss; ss.append("ERROR"); @@ -2863,38 +2877,6 @@ class DummyLogCtx : implements IContextLogger { return 0; } - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override - { - logTrace.setGlobalId(id); - } - virtual void setCallerId(const char *id) override - { - logTrace.setCallerId(id); - } - virtual const char *queryGlobalId() const override - { - return logTrace.queryGlobalId(); - } - virtual const char *queryCallerId() const override - { - return logTrace.queryCallerId(); - } - virtual const char *queryLocalId() const override - { - return logTrace.queryLocalId(); - } - virtual void setHttpIdHeaderNames(const char *global, const char *caller) override - { - logTrace.setHttpIdHeaderNames(global, caller); - } - virtual const char *queryGlobalIdHttpHeaderName() const override - { - return logTrace.queryGlobalIdHTTPHeaderName(); - } - virtual const char *queryCallerIdHttpHeaderName() const override - { - return logTrace.queryCallerIdHTTPHeaderName(); - } virtual const CRuntimeStatisticCollection &queryStats() const override { throwUnexpected(); @@ -2903,6 +2885,10 @@ class DummyLogCtx : implements IContextLogger { throwUnexpected(); } + virtual void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous) const + { + throwUnexpected(); + } } dummyContextLogger; extern jlib_decl const IContextLogger &queryDummyContextLogger() diff --git a/system/jlib/jlog.hpp b/system/jlib/jlog.hpp index 30c5516ba43..7eeaf87250a 100644 --- a/system/jlib/jlog.hpp +++ b/system/jlib/jlog.hpp @@ -1243,26 +1243,101 @@ extern jlib_decl void AuditSystemAccess(const char *userid, bool success, char c interface jlib_decl IContextLogger : extends IInterface { + const LogMsgJobInfo & logMsgJobInfo; + LogTrace logTrace; +public: + IContextLogger(const IPropertyTree *cfg, const LogMsgJobInfo & _logMsgJobInfo=unknownJob) : logMsgJobInfo(_logMsgJobInfo) + { + if (cfg && cfg->hasProp("@httpGlobalIdHeader")) + logTrace.setHttpIdHeaderNames(cfg->queryProp("@httpGlobalIdHeader"), cfg->queryProp("@httpCallerIdHeader")); + } + virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) + { + logTrace.setGlobalId(id); + } + virtual void setCallerId(const char *id) + { + logTrace.setCallerId(id); + } + virtual const char *queryGlobalId() const + { + return logTrace.queryGlobalId(); + } + virtual const char *queryLocalId() const + { + return logTrace.queryLocalId(); + } + virtual const char *queryCallerId() const + { + return logTrace.queryCallerId(); + } + virtual void setHttpIdHeaderNames(const char *global, const char *caller) + { + logTrace.setHttpIdHeaderNames(global, caller); + } + virtual const char *queryGlobalIdHttpHeaderName() const + { + return logTrace.queryGlobalIdHTTPHeaderName(); + } + virtual const char *queryCallerIdHttpHeaderName() const + { + return logTrace.queryCallerIdHTTPHeaderName(); + } + virtual void CTXLOG(const char *format, ...) const __attribute__((format(printf, 2, 3))); virtual void mCTXLOG(const char *format, ...) const __attribute__((format(printf, 2, 3))); - virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const __attribute__((format(printf,5,0))) = 0; - void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const __attribute__((format(printf, 5, 6))); - virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0))) = 0; + virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const __attribute__((format(printf,5,0))); + virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const __attribute__((format(printf, 5, 6))); + virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0))); virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const = 0; virtual void setStatistic(StatisticKind kind, unsigned __int64 value) const = 0; virtual void mergeStats(const CRuntimeStatisticCollection &from) const = 0; virtual unsigned queryTraceLevel() const = 0; - virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) = 0; - virtual void setHttpIdHeaderNames(const char *global, const char *caller) = 0; - virtual const char *queryGlobalId() const = 0; - virtual const char *queryLocalId() const = 0; - virtual const char *queryGlobalIdHttpHeaderName() const = 0; - virtual const char *queryCallerIdHttpHeaderName() const = 0; - virtual void setCallerId(const char *id) = 0; - virtual const char *queryCallerId() const = 0; virtual const CRuntimeStatisticCollection & queryStats() const = 0; virtual void recordStatistics(IStatisticGatherer &progress) const = 0; + virtual void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous) const = 0; +}; + +class CStatsContextLogger : implements IContextLogger, public CInterface +{ +protected: + unsigned traceLevel = 1; + mutable CRuntimeStatisticCollection stats; + +public: + IMPLEMENT_IINTERFACE; + CStatsContextLogger(const IPropertyTree *cfg, const StatisticsMapping & mapping, const LogMsgJobInfo & _logMsgJobInfo=unknownJob) : IContextLogger(cfg, _logMsgJobInfo), stats(mapping) + { + } + virtual unsigned queryTraceLevel() const override + { + return traceLevel; + } + virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const override + { + stats.addStatisticAtomic(kind, value); + } + virtual void setStatistic(StatisticKind kind, unsigned __int64 value) const override + { + stats.setStatistic(kind, value); + } + virtual void mergeStats(const CRuntimeStatisticCollection &from) const override + { + stats.merge(from); + } + virtual const CRuntimeStatisticCollection &queryStats() const override + { + return stats; + } + virtual void recordStatistics(IStatisticGatherer &progress) const override + { + stats.recordStatistics(progress, false); + } + virtual void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous) const override + { + previous.updateDelta(to, stats); + } }; extern jlib_decl StringBuffer &appendGloballyUniqueId(StringBuffer &s); diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index f052f970eab..1060cf46770 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1319,27 +1319,10 @@ bool StatisticsMapping::equals(const StatisticsMapping & other) } // stat. mappings shared between master and slave activities -const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile}); const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks, StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles, StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches, StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles}); -const StatisticsMapping basicActivityStatistics({StTimeLocalExecute, StTimeBlocked}); -const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); -const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics); -const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, diskReadRemoteStatistics, basicActivityStatistics, jhtreeCacheStatistics); -const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics); -const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, jhtreeCacheStatistics); -const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); -const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics); -const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics); -const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics); -const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); -const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); -const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); -const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); -const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); - const StatisticsMapping allStatistics(StKindAll); const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans}); const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDiskRead, StNumDiskReads, StCycleDiskWriteIOCycles, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries}); diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 3415f5a096c..f1cae8ef5dc 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -530,6 +530,7 @@ class jlib_decl CRuntimeStatistic RelaxedAtomic value; }; +interface IContextLogger; class CNestedRuntimeStatisticMap; //The CRuntimeStatisticCollection used to gather statistics for an activity - it has no notion of its scope, but can contain nested scopes. @@ -943,22 +944,6 @@ extern jlib_decl StringBuffer & formatMoney(StringBuffer &out, unsigned __int64 extern jlib_decl stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection); //statistics gathered by the different activities -extern jlib_decl const StatisticsMapping spillStatistics; extern jlib_decl const StatisticsMapping jhtreeCacheStatistics; -extern jlib_decl const StatisticsMapping basicActivityStatistics; -extern jlib_decl const StatisticsMapping groupActivityStatistics; -extern jlib_decl const StatisticsMapping hashJoinActivityStatistics; -extern jlib_decl const StatisticsMapping indexReadActivityStatistics; -extern jlib_decl const StatisticsMapping indexWriteActivityStatistics; -extern jlib_decl const StatisticsMapping joinActivityStatistics; -extern jlib_decl const StatisticsMapping keyedJoinActivityStatistics; -extern jlib_decl const StatisticsMapping lookupJoinActivityStatistics; -extern jlib_decl const StatisticsMapping loopActivityStatistics; -extern jlib_decl const StatisticsMapping diskReadActivityStatistics; -extern jlib_decl const StatisticsMapping diskReadPartStatistics; -extern jlib_decl const StatisticsMapping diskWriteActivityStatistics; -extern jlib_decl const StatisticsMapping sortActivityStatistics; - -extern jlib_decl const StatisticsMapping graphStatistics; -extern jlib_decl const StatisticsMapping indexDistribActivityStatistics; + #endif diff --git a/system/jlib/jtrace.hpp b/system/jlib/jtrace.hpp index 947ee6f8904..df158c7270b 100644 --- a/system/jlib/jtrace.hpp +++ b/system/jlib/jtrace.hpp @@ -201,7 +201,6 @@ extern jlib_decl TraceFlags loadTraceFlags(const IPropertyTree * globals, const // Temporarily modify the trace context and/or flags for the current thread, for the lifetime of the LogContextScope object -interface IContextLogger; class jlib_decl LogContextScope { public: diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 33723ba94fb..778fa104da8 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -2454,7 +2454,7 @@ class ReDistributeSlaveActivity : public HashDistributeSlaveActivity class IndexDistributeSlaveActivity : public HashDistributeSlaveBase { typedef HashDistributeSlaveBase PARENT; - CThorContextLogger contextLogger; + CStatsContextLogger contextLogger; CStatsCtxLoggerDeltaUpdater statsUpdater; class CKeyLookup : implements IHash @@ -2487,7 +2487,7 @@ class IndexDistributeSlaveActivity : public HashDistributeSlaveBase } *lookup; public: - IndexDistributeSlaveActivity(CGraphElementBase *container) : PARENT(container, indexDistribActivityStatistics), lookup(NULL), statsUpdater(jhtreeCacheStatistics, *this, contextLogger) + IndexDistributeSlaveActivity(CGraphElementBase *container) : contextLogger(globals, jhtreeCacheStatistics, thorJob), PARENT(container, indexDistribActivityStatistics), lookup(NULL), statsUpdater(jhtreeCacheStatistics, *this, contextLogger) { } ~IndexDistributeSlaveActivity() diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index 802e0f8a412..b023542db0e 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -78,7 +78,7 @@ class CIndexReadSlaveBase : public CSlaveActivity Owned lazyIFileIO; mutable CriticalSection ioStatsCS; unsigned fileTableStart = NotFound; - CThorContextLogger contextLogger; + CStatsContextLogger contextLogger; CStatsCtxLoggerDeltaUpdater statsUpdater; class TransformCallback : implements IThorIndexCallback , public CSimpleInterface @@ -529,7 +529,7 @@ class CIndexReadSlaveBase : public CSlaveActivity public: CIndexReadSlaveBase(CGraphElementBase *container) : CSlaveActivity(container, indexReadActivityStatistics), callback(*this), - statsUpdater(jhtreeCacheStatistics, *this, contextLogger) + statsUpdater(jhtreeCacheStatistics, *this, contextLogger), contextLogger(globals, jhtreeCacheStatistics, thorJob) { helper = (IHThorIndexReadBaseArg *)container->queryHelper(); limitTransformExtra = nullptr; diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp index ff066dde087..723c384324e 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp @@ -1206,7 +1206,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem unsigned candidateCount; __int64 lastSeeks, lastScans; IConstPointerArrayOf translators; - CThorContextLogger contextLogger; + CStatsContextLogger contextLogger; inline void updateJhTreeStats() { @@ -1253,7 +1253,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CKeyLocalLookup(CKeyedJoinSlave &_owner, const RtlRecord &_keyRecInfo) : owner(_owner), keyRecInfo(_keyRecInfo), indexReadFieldsRow(_owner.indexInputAllocator) + CKeyLocalLookup(CKeyedJoinSlave &_owner, const RtlRecord &_keyRecInfo) : contextLogger(globals, jhtreeCacheStatistics, thorJob), owner(_owner), keyRecInfo(_keyRecInfo), indexReadFieldsRow(_owner.indexInputAllocator) { tlkManager.setown(owner.keyHasTlk ? createLocalKeyManager(keyRecInfo, nullptr, &contextLogger, owner.helper->hasNewSegmentMonitors(), false) : nullptr); reset(); diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index b0f3b15c070..0eb6a44255c 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -2222,7 +2222,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem CPartDescriptorArray allIndexParts; std::vector localIndexParts, localFetchPartMap; IArrayOf tlkKeyIndexes; - CThorContextLogger contextLogger; + CStatsContextLogger contextLogger; CStatsCtxLoggerDeltaUpdater statsUpdater; Owned joinFieldsAllocator; OwnedConstThorRow defaultRight; @@ -2938,7 +2938,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem public: IMPLEMENT_IINTERFACE_USING(PARENT); - CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), statsUpdater(jhtreeCacheStatistics, *this, contextLogger) + CKeyedJoinSlave(CGraphElementBase *_container) : contextLogger(globals, jhtreeCacheStatistics, thorJob), PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), statsUpdater(jhtreeCacheStatistics, *this, contextLogger) { helper = static_cast (queryHelper()); reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename); diff --git a/thorlcr/graph/thgraph.cpp b/thorlcr/graph/thgraph.cpp index 7c98f60872f..41b1bd54f2f 100644 --- a/thorlcr/graph/thgraph.cpp +++ b/thorlcr/graph/thgraph.cpp @@ -2743,7 +2743,7 @@ void CJobBase::init() forceLogGraphIdMin = (graph_id)getWorkUnitValueInt("forceLogGraphIdMin", 0); forceLogGraphIdMax = (graph_id)getWorkUnitValueInt("forceLogGraphIdMax", 0); - logctx.setown(new CThorContextLogger()); + logctx.setown(new CStatsContextLogger(globals, jhtreeCacheStatistics, thorJob)); // helpers to preserve legacy behaviour of a few 'expert' properties that could be set as attributes directly under ThorCluster auto getLegacyExpertSettingBool = [this](const char *property, bool dft) diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 9f6d557f975..283b7d9fcc3 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -334,10 +334,10 @@ class CStatsCtxLoggerDeltaUpdater : public CStatsDeltaUpdater { protected: CSlaveActivity &activity; - CThorContextLogger &ctxLogger; + IContextLogger &ctxLogger; public: - inline CStatsCtxLoggerDeltaUpdater(const StatisticsMapping &mapping, CSlaveActivity &_activity, CThorContextLogger &_ctxLogger, unsigned timeThresholdSecs=0) + inline CStatsCtxLoggerDeltaUpdater(const StatisticsMapping &mapping, CSlaveActivity &_activity, IContextLogger &_ctxLogger, unsigned timeThresholdSecs=0) : CStatsDeltaUpdater(mapping, timeThresholdSecs), activity(_activity), ctxLogger(_ctxLogger) { resetStart(); diff --git a/thorlcr/slave/slavmain.cpp b/thorlcr/slave/slavmain.cpp index e1be240e844..9f9cd8ddad0 100644 --- a/thorlcr/slave/slavmain.cpp +++ b/thorlcr/slave/slavmain.cpp @@ -115,7 +115,7 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, unsigned maxCachedKJManagers = defaultMaxCachedKJManagers; unsigned maxCachedFetchContexts = defaultMaxCachedFetchContexts; unsigned keyLookupMaxProcessThreads = defaultKeyLookupMaxProcessThreads; - CThorContextLogger contextLogger; + CStatsContextLogger contextLogger; class CLookupKey { unsigned hashv = 0; @@ -1189,7 +1189,7 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, public: IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf); - CKJService(mptag_t _mpTag) : threaded("CKJService", this), keyLookupMpTag(_mpTag) + CKJService(mptag_t _mpTag) : threaded("CKJService", this), keyLookupMpTag(_mpTag), contextLogger(globals, jhtreeCacheStatistics, unknownJob) { setupProcessorPool(); } diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 5f8c6764de9..0f2bf09c777 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -72,6 +72,24 @@ mptag_t kjServiceMpTag; Owned globals; static Owned ClusterMPAllocator; +// stat. mappings shared between master and slave activities +const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile}); +const StatisticsMapping basicActivityStatistics({StTimeLocalExecute, StTimeBlocked}); +const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); +const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics); +const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, diskReadRemoteStatistics, basicActivityStatistics, jhtreeCacheStatistics); +const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics); +const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, jhtreeCacheStatistics); +const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); +const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics); +const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics); +const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics); +const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); +const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); +const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); +const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); +const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); + MODULE_INIT(INIT_PRIORITY_STANDARD) { ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT)); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 126df4c97ca..372a5b74705 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -130,6 +130,26 @@ enum RegistryCode:unsigned { rc_register, rc_deregister }; #define destroyThorRow(ptr) free(ptr) #define reallocThorRow(ptr, size) realloc(ptr, size) + +//statistics gathered by the different activities +extern graph_decl const StatisticsMapping spillStatistics; +extern graph_decl const StatisticsMapping basicActivityStatistics; +extern graph_decl const StatisticsMapping groupActivityStatistics; +extern graph_decl const StatisticsMapping hashJoinActivityStatistics; +extern graph_decl const StatisticsMapping indexReadActivityStatistics; +extern graph_decl const StatisticsMapping indexWriteActivityStatistics; +extern graph_decl const StatisticsMapping joinActivityStatistics; +extern graph_decl const StatisticsMapping keyedJoinActivityStatistics; +extern graph_decl const StatisticsMapping lookupJoinActivityStatistics; +extern graph_decl const StatisticsMapping loopActivityStatistics; +extern graph_decl const StatisticsMapping diskReadActivityStatistics; +extern graph_decl const StatisticsMapping diskReadPartStatistics; +extern graph_decl const StatisticsMapping diskWriteActivityStatistics; +extern graph_decl const StatisticsMapping sortActivityStatistics; + +extern graph_decl const StatisticsMapping graphStatistics; +extern graph_decl const StatisticsMapping indexDistribActivityStatistics; + class BooleanOnOff { bool &tf; @@ -589,38 +609,5 @@ extern graph_decl __int64 getExpertOptInt64(const char *opt, __int64 dft=0); extern graph_decl StringBuffer &getExpertOptString(const char *opt, StringBuffer &out); extern graph_decl void setExpertOpt(const char *opt, const char *value); -//// -// IContextLogger -class CThorContextLogger : public CThorBaseContextLogger -{ -public: - CThorContextLogger() - { - if (globals->hasProp("@httpGlobalIdHeader")) - setHttpIdHeaderNames(globals->queryProp("@httpGlobalIdHeader"), globals->queryProp("@httpCallerIdHeader")); - } - virtual void CTXLOG(const char *format, ...) const override __attribute__((format(printf,2,3))) - { - va_list args; - va_start(args, format); - CTXLOGva(MCdebugProgress, thorJob, NoLogMsgCode, format, args); - va_end(args); - } - virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0))) - { - StringBuffer ss; - ss.append("ERROR"); - if (E) - ss.append(": ").append(E->errorCode()); - if (file) - ss.appendf(": %s(%d) ", file, line); - if (E) - E->errorMessage(ss.append(": ")); - if (format) - ss.append(": ").valist_appendf(format, args); - LOG(MCoperatorProgress, thorJob, "%s", ss.str()); - } -}; - #endif