Skip to content

Commit

Permalink
HPCC-29817 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>

HPCC-29817 Changes following review

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jul 20, 2023
1 parent 0a62834 commit a63c70a
Show file tree
Hide file tree
Showing 25 changed files with 233 additions and 314 deletions.
9 changes: 8 additions & 1 deletion common/thorhelper/roxiehelper.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum TracingCategory
class LogItem;
interface IRoxieContextLogger : extends IContextLogger
{
IRoxieContextLogger(IPropertyTree *cfg) : IContextLogger(cfg) {}
// Override base interface with versions that add prefix
// We could consider moving some or all of these down into IContextLogger
virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0)))
Expand All @@ -74,7 +75,13 @@ interface IRoxieContextLogger : extends IContextLogger
getLogPrefix(prefix);
CTXLOGaeva(E, file, line, prefix.str(), format, args);
}

virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
{
va_list args;
va_start(args, format);
logOperatorExceptionVA(E, file, line, format, args);
va_end(args);
}
virtual StringBuffer &getLogPrefix(StringBuffer &ret) const = 0;
virtual bool isIntercepted() const = 0;
virtual void CTXLOGa(TracingCategory category, const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *prefix, const char *text) const = 0;
Expand Down
103 changes: 0 additions & 103 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#ifndef THORCOMMON_HPP
#define THORCOMMON_HPP

#include "jlog.hpp"
#include "jiface.hpp"
#include "jcrc.hpp"
#include "jlzw.hpp"
Expand Down Expand Up @@ -673,108 +672,6 @@ class THORHELPER_API IndirectCodeContext : implements ICodeContext
protected:
ICodeContext * ctx;
};
class CThorBaseContextLogger : public CSimpleInterfaceOf<IContextLogger>
{
protected:
unsigned traceLevel = 1;
LogTrace logTrace;
mutable CRuntimeStatisticCollection stats;

public:
CThorBaseContextLogger() : stats(jhtreeCacheStatistics)
{
}
virtual void CTXLOG(const char *format, ...) const override __attribute__((format(printf,2,3)))
{
va_list args;
va_start(args, format);
CTXLOGva(MCdebugProgress, unknownJob, NoLogMsgCode, format, args);
va_end(args);
}

virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0)))
{
VALOG(cat, job, code, format, args);
}
virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0)))
{
StringBuffer ss;
ss.append("ERROR");
if (E)
ss.append(": ").append(E->errorCode());
if (file)
ss.appendf(": %s(%d) ", file, line);
if (E)
E->errorMessage(ss.append(": "));
if (format)
ss.append(": ").valist_appendf(format, args);
LOG(MCoperatorProgress, unknownJob, "%s", ss.str());
}
virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const override
{
stats.addStatisticAtomic(kind, value);
}
virtual void setStatistic(StatisticKind kind, unsigned __int64 value) const override
{
stats.setStatistic(kind, value);
}
virtual void mergeStats(const CRuntimeStatisticCollection &from) const override
{
stats.merge(from);
}
virtual unsigned queryTraceLevel() const override
{
return traceLevel;
}
virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override
{
logTrace.setGlobalId(id);
}
virtual void setCallerId(const char *id) override
{
logTrace.setCallerId(id);
}
virtual const char *queryGlobalId() const override
{
return logTrace.queryGlobalId();
}
virtual const char *queryLocalId() const override
{
return logTrace.queryLocalId();
}
virtual const char *queryCallerId() const override
{
return logTrace.queryCallerId();
}
virtual void setHttpIdHeaderNames(const char *global, const char *caller) override
{
logTrace.setHttpIdHeaderNames(global, caller);
}
virtual const char *queryGlobalIdHttpHeaderName() const override
{
return logTrace.queryGlobalIdHTTPHeaderName();
}
virtual const char *queryCallerIdHttpHeaderName() const override
{
return logTrace.queryCallerIdHTTPHeaderName();
}
virtual const CRuntimeStatisticCollection &queryStats() const override
{
return stats;
}
virtual void recordStatistics(IStatisticGatherer &progress) const override
{
stats.recordStatistics(progress, false);
}
void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous)
{
previous.updateDelta(to, stats);
}
void reset()
{
stats.reset();
}
};

extern THORHELPER_API bool isActivitySink(ThorActivityKind kind);
extern THORHELPER_API bool isActivitySource(ThorActivityKind kind);
Expand Down
1 change: 1 addition & 0 deletions ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ struct IAgentContext : extends IGlobalCodeContext
virtual bool forceNewDiskReadActivity() const = 0;
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
virtual double queryAgentMachineCost() const = 0;
virtual IContextLogger & queryContextLogger() = 0;
};

#endif // AGENTCTX_HPP_INCL
3 changes: 2 additions & 1 deletion ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ class EclAgentPluginCtx : public SimplePluginCtx
//=======================================================================================

EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, ILogMsgHandler * _logMsgHandler)
: wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler)
: wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler),
contextLogger(agentTopology, jhtreeCacheStatistics, unknownJob)
{
isAborting = false;
isStandAloneExe = false;
Expand Down
12 changes: 10 additions & 2 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,11 @@ public:
virtual double queryAgentMachineCost() const override
{
return ctx->queryAgentMachineCost();
};

}
virtual IContextLogger & queryContextLogger() override
{
return ctx->queryContextLogger();
}
protected:
IAgentContext * ctx;
};
Expand Down Expand Up @@ -392,6 +395,7 @@ private:
Owned<IOrderedOutputSerializer> outputSerializer;
int retcode;
double agentMachineCost = 0;
CStatsContextLogger contextLogger;

private:
void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val);
Expand Down Expand Up @@ -705,6 +709,10 @@ public:
{
return agentMachineCost;
}
virtual IContextLogger & queryContextLogger()
{
return contextLogger;
}
};

//---------------------------------------------------------------------------
Expand Down
4 changes: 0 additions & 4 deletions ecl/hthor/hthor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -3239,10 +3239,6 @@ protected:
void onLimitExceeded();
};

// improvement: override constructor to store setHttpIdHeaderNames, override CTXLOG & logOperatorExceptionVA to log LogMsgJobInfo
class CHThorContextLogger : public CThorBaseContextLogger
{
};

#define MAKEFACTORY(NAME) \
extern HTHOR_API IHThorActivity * create ## NAME ## Activity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThor ## NAME ## Arg &arg, ThorActivityKind kind, EclGraph & _graph) \
Expand Down
46 changes: 20 additions & 26 deletions ecl/hthor/hthorkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
{
CHThorActivityBase::updateProgress(progress);
StatsActivityScope scope(progress, activityId);
contextLogger.recordStatistics(progress);
agent.queryContextLogger().recordStatistics(progress);
progress.addStatistic(StNumPostFiltered, queryPostFiltered());
}

Expand Down Expand Up @@ -268,7 +268,6 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
unsigned keyIndexCacheIdx = 0;

unsigned postFiltered;
CHThorContextLogger contextLogger;
bool singlePart = false; // a single part index, not part of a super file - optimize so never reload the part.
bool localSortKey = false;
bool initializedFileInfo = false;
Expand Down Expand Up @@ -418,7 +417,7 @@ bool CHThorIndexReadActivityBase::doPreopenLimitFile(unsigned __int64 & count, u
{
Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
verifyIndex(tlk);
Owned<IKeyManager> tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &contextLogger, helper.hasNewSegmentMonitors(), false);
Owned<IKeyManager> tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &agent.queryContextLogger(), helper.hasNewSegmentMonitors(), false);
initManager(tlman, true);
while(tlman->lookup(false) && (count<=limit))
{
Expand Down Expand Up @@ -454,7 +453,7 @@ IKeyIndex * CHThorIndexReadActivityBase::doPreopenLimitPart(unsigned __int64 & r
verifyIndex(kidx);
if (limit != (unsigned) -1)
{
Owned<IKeyManager> kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, &contextLogger, helper.hasNewSegmentMonitors(), false);
Owned<IKeyManager> kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, &agent.queryContextLogger(), helper.hasNewSegmentMonitors(), false);
initManager(kman, false);
result += kman->checkCount(limit-result);
}
Expand Down Expand Up @@ -556,7 +555,7 @@ void CHThorIndexReadActivityBase::initManager(IKeyManager *manager, bool isTlk)
void CHThorIndexReadActivityBase::initPart()
{
assertex(!keyIndex->isTopLevelKey());
klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, &contextLogger, helper.hasNewSegmentMonitors(), false));
klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, &agent.queryContextLogger(), helper.hasNewSegmentMonitors(), false));
initManager(klManager, false);
callback.setManager(klManager, nullptr);
}
Expand Down Expand Up @@ -590,7 +589,7 @@ bool CHThorIndexReadActivityBase::firstMultiPart()
if(!tlk)
openTlk();
verifyIndex(tlk);
tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &contextLogger, helper.hasNewSegmentMonitors(), false));
tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &agent.queryContextLogger(), helper.hasNewSegmentMonitors(), false));
initManager(tlManager, true);
nextPartNumber = 0;
return nextMultiPart();
Expand Down Expand Up @@ -3020,11 +3019,10 @@ class KeyedLookupPartHandler : extends ThreadedPartHandler<MatchSet>, implements
Owned<IKeyManager> manager;
IAgentContext &agent;
DistributedKeyLookupHandler * tlk;
CHThorContextLogger &contextLogger;
public:
IMPLEMENT_IINTERFACE;

KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, CHThorContextLogger & _contextLogger);
KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent);

~KeyedLookupPartHandler()
{
Expand Down Expand Up @@ -3074,7 +3072,6 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
Owned<IThreadPool> threadPool;
IntArray subSizes;
IAgentContext &agent;
CHThorContextLogger &contextLogger;

void addFile(IDistributedFile &f)
{
Expand All @@ -3085,7 +3082,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
for (unsigned idx = 0; idx < numParts; idx++)
{
IDistributedFilePart *part = f.getPart(idx);
parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent, contextLogger));
parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent));
}
keyFiles.append(OLINK(f));
tlks.append(*f.getPart(numParts));
Expand All @@ -3095,8 +3092,8 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
public:
IMPLEMENT_IINTERFACE;

DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, CHThorContextLogger & _contextLogger)
: owner(_owner), file(f), agent(_agent), contextLogger(_contextLogger)
DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
: owner(_owner), file(f), agent(_agent)
{
threadPool.setown(createThreadPool("hthor keyed join lookup thread pool", &threadFactory));
IDistributedSuperFile *super = f->querySuperFile();
Expand Down Expand Up @@ -3161,7 +3158,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
//Owned<IRecordLayoutTranslator>
trans.setown(owner.getLayoutTranslator(&f));
owner.verifyIndex(&f, index, trans);
Owned<IKeyManager> manager = createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false);
Owned<IKeyManager> manager = createLocalKeyManager(owner.queryIndexRecord(), index, &agent.queryContextLogger(), owner.hasNewSegmentMonitors(), false);
managers.append(*manager.getLink());
}
opened = true;
Expand Down Expand Up @@ -3190,8 +3187,8 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
const IDynamicTransform * queryRecordLayoutTranslator() const { return trans; }
};

KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, CHThorContextLogger & _contextLogger)
: ThreadedPartHandler<MatchSet>(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk), contextLogger(_contextLogger)
KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent)
: ThreadedPartHandler<MatchSet>(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk)
{
}

Expand All @@ -3200,7 +3197,7 @@ void KeyedLookupPartHandler::openPart()
if(manager)
return;
Owned<IKeyIndex> index = openKeyFile(*part);
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false));
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &agent.queryContextLogger(), owner.hasNewSegmentMonitors(), false));
const IDynamicTransform * trans = tlk->queryRecordLayoutTranslator();
if(trans && !index->isTopLevelKey())
manager->setLayoutTranslator(trans);
Expand All @@ -3215,14 +3212,13 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
IJoinProcessor &owner;
IAgentContext &agent;
bool opened;
CHThorContextLogger &contextLogger;

public:
IMPLEMENT_IINTERFACE;


MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, CHThorContextLogger & _contextLogger)
: file(f), owner(_owner), agent(_agent), opened(false), contextLogger(_contextLogger)
MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
: file(f), owner(_owner), agent(_agent), opened(false)
{
super = f->querySuperFile();
if (super)
Expand Down Expand Up @@ -3281,7 +3277,7 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
{
Owned<IKeyIndex> index = openKeyFile(f.queryPart(0));
owner.verifyIndex(&f, index, trans);
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false));
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &agent.queryContextLogger(), owner.hasNewSegmentMonitors(), false));
}
else
{
Expand All @@ -3294,7 +3290,7 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
parts->addIndex(index.getLink());
}
owner.verifyIndex(&f, index, trans);
manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, &contextLogger, owner.hasNewSegmentMonitors(), false));
manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, &agent.queryContextLogger(), owner.hasNewSegmentMonitors(), false));
}
if(trans)
manager->setLayoutTranslator(trans);
Expand Down Expand Up @@ -3401,7 +3397,6 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
Owned<const IDynamicTransform> translator;
RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
bool isCodeSigned = false;
CHThorContextLogger contextLogger;

public:
CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
Expand Down Expand Up @@ -3965,9 +3960,9 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
mono = useMonolithic(*dFile);
}
if (mono)
lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent, contextLogger));
lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent));
else
lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent, contextLogger));
lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent));
agent.logFileAccess(dFile, "HThor", "READ", graph);
}
else
Expand All @@ -3985,7 +3980,6 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
manager->finishSegmentMonitors();
manager->reset();
manager->resetCounts();
contextLogger.reset();
}

virtual void doneManager(IKeyManager * manager)
Expand Down Expand Up @@ -4061,7 +4055,7 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
progress.addStatistic(StNumPreFiltered, prefiltered);
progress.addStatistic(StNumPostFiltered, postfiltered);
progress.addStatistic(StNumIndexSkips, skips);
contextLogger.recordStatistics(progress);
agent.queryContextLogger().recordStatistics(progress);
}

protected:
Expand Down
Loading

0 comments on commit a63c70a

Please sign in to comment.