Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-29817 Eliminate seeks, scans & wildseeks from KeyStatsCollector and track cache stats in hthor #17541

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef THORCOMMON_HPP
#define THORCOMMON_HPP

#include "jlog.hpp"
#include "jiface.hpp"
#include "jcrc.hpp"
#include "jlzw.hpp"
Expand Down Expand Up @@ -672,6 +673,92 @@ class THORHELPER_API IndirectCodeContext : implements ICodeContext
protected:
ICodeContext * ctx;
};
class CStatsContextLogger : public CSimpleInterfaceOf<IContextLogger>
{
protected:
const LogMsgJobInfo job;
unsigned traceLevel = 1;
Owned<ISpan> activeSpan;
mutable CRuntimeStatisticCollection stats;
public:
CStatsContextLogger(const CRuntimeStatisticCollection &_mapping, const LogMsgJobInfo & _job=unknownJob) : job(_job), stats(_mapping) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial/formatting: double space before &_mapping


virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial/formatting: double space before __attribute

{
VALOG(cat, job, code, format, args);
}
virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0)))
{
StringBuffer ss;
ss.append("ERROR");
if (E)
ss.append(": ").append(E->errorCode());
if (file)
ss.appendf(": %s(%d) ", file, line);
if (E)
E->errorMessage(ss.append(": "));
if (format)
ss.append(": ").valist_appendf(format, args);
LOG(MCoperatorProgress, queryJob(), "%s", ss.str());
}
virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const override
{
stats.addStatisticAtomic(kind, value);
}
virtual void setStatistic(StatisticKind kind, unsigned __int64 value) const override
{
stats.setStatistic(kind, value);
}
virtual void mergeStats(const CRuntimeStatisticCollection &from) const override
{
stats.merge(from);
}
virtual unsigned queryTraceLevel() const override
{
return traceLevel;
}
virtual void setActiveSpan(ISpan * span) override
{
activeSpan.set(span);
}
virtual IProperties * getClientHeaders() const override
{
if (!activeSpan)
return nullptr;
return ::getClientHeaders(activeSpan);
}
virtual const char *queryGlobalId() const override
{
if (!activeSpan)
return nullptr;
return activeSpan->queryGlobalId();
}
virtual const char *queryLocalId() const override
{
if (!activeSpan)
return nullptr;
return activeSpan->queryLocalId();
}
virtual const char *queryCallerId() const override
{
if (!activeSpan)
return nullptr;
return activeSpan->queryCallerId();
}
virtual const CRuntimeStatisticCollection &queryStats() const override
{
return stats;
}
virtual void recordStatistics(IStatisticGatherer &progress) const override
{
stats.recordStatistics(progress, false);
}
void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous)
{
previous.updateDelta(to, stats);
}
virtual const LogMsgJobInfo & queryJob() const override { return job; }
};

extern THORHELPER_API bool isActivitySink(ThorActivityKind kind);
extern THORHELPER_API bool isActivitySource(ThorActivityKind kind);
Expand Down
92 changes: 31 additions & 61 deletions ecl/hthor/hthorkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "thorcommon.hpp"
#include "rtldynfield.hpp"
#include "thorfile.hpp"
#include "jstats.h"

#define MAX_FETCH_LOOKAHEAD 1000

Expand Down Expand Up @@ -200,24 +201,10 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
{
CHThorActivityBase::updateProgress(progress);
StatsActivityScope scope(progress, activityId);
contextLogger.recordStatistics(progress);
progress.addStatistic(StNumPostFiltered, queryPostFiltered());
progress.addStatistic(StNumIndexSeeks, querySeeks());
progress.addStatistic(StNumIndexScans, queryScans());
progress.addStatistic(StNumIndexWildSeeks, queryWildSeeks());
}

virtual unsigned querySeeks() const
{
return seeks + (klManager ? klManager->querySeeks() : 0);
}
virtual unsigned queryScans() const
{
return scans + (klManager ? klManager->queryScans() : 0);
}
virtual unsigned queryWildSeeks() const
{
return wildseeks + (klManager ? klManager->queryWildSeeks() : 0);
}
virtual unsigned queryPostFiltered() const
{
return postFiltered;
Expand Down Expand Up @@ -281,10 +268,8 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
UnsignedArray superIndexCache;
unsigned keyIndexCacheIdx = 0;

unsigned seeks;
unsigned scans;
unsigned postFiltered;
unsigned wildseeks;
CStatsContextLogger contextLogger;
bool singlePart = false; // a single part index, not part of a super file - optimize so never reload the part.
bool localSortKey = false;
bool initializedFileInfo = false;
Expand All @@ -306,16 +291,13 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
};

CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg), contextLogger(jhtreeCacheStatistics)
{
nextPartNumber = 0;

eclKeySize.set(helper.queryDiskRecordSize());

postFiltered = 0;
seeks = 0;
scans = 0;
wildseeks = 0;
helper.setCallback(&callback);
limitTransformExtra = nullptr;
if (_node)
Expand Down Expand Up @@ -437,7 +419,7 @@ bool CHThorIndexReadActivityBase::doPreopenLimitFile(unsigned __int64 & count, u
{
Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
verifyIndex(tlk);
Owned<IKeyManager> tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, NULL, helper.hasNewSegmentMonitors(), false);
Owned<IKeyManager> tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &contextLogger, helper.hasNewSegmentMonitors(), false);
initManager(tlman, true);
while(tlman->lookup(false) && (count<=limit))
{
Expand Down Expand Up @@ -473,7 +455,7 @@ IKeyIndex * CHThorIndexReadActivityBase::doPreopenLimitPart(unsigned __int64 & r
verifyIndex(kidx);
if (limit != (unsigned) -1)
{
Owned<IKeyManager> kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, NULL, helper.hasNewSegmentMonitors(), false);
Owned<IKeyManager> kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, &contextLogger, helper.hasNewSegmentMonitors(), false);
initManager(kman, false);
result += kman->checkCount(limit-result);
}
Expand Down Expand Up @@ -572,10 +554,10 @@ void CHThorIndexReadActivityBase::initManager(IKeyManager *manager, bool isTlk)
manager->reset();
}

void CHThorIndexReadActivityBase::initPart()
{
void CHThorIndexReadActivityBase::initPart()
{
assertex(!keyIndex->isTopLevelKey());
klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, NULL, helper.hasNewSegmentMonitors(), false));
klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, &contextLogger, helper.hasNewSegmentMonitors(), false));
initManager(klManager, false);
callback.setManager(klManager, nullptr);
}
Expand All @@ -584,12 +566,7 @@ void CHThorIndexReadActivityBase::killPart()
{
callback.setManager(nullptr, nullptr);
if (klManager)
{
seeks += klManager->querySeeks();
scans += klManager->queryScans();
wildseeks += klManager->queryWildSeeks();
klManager.clear();
}
}

bool CHThorIndexReadActivityBase::setCurrentPart(unsigned whichPart)
Expand All @@ -614,7 +591,7 @@ bool CHThorIndexReadActivityBase::firstMultiPart()
if(!tlk)
openTlk();
verifyIndex(tlk);
tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, NULL, helper.hasNewSegmentMonitors(), false));
tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &contextLogger, helper.hasNewSegmentMonitors(), false));
initManager(tlManager, true);
nextPartNumber = 0;
return nextMultiPart();
Expand Down Expand Up @@ -3044,11 +3021,11 @@ class KeyedLookupPartHandler : extends ThreadedPartHandler<MatchSet>, implements
Owned<IKeyManager> manager;
IAgentContext &agent;
DistributedKeyLookupHandler * tlk;

IContextLogger &contextLogger;
public:
IMPLEMENT_IINTERFACE;

KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent);
KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, IContextLogger & _contextLogger);

~KeyedLookupPartHandler()
{
Expand Down Expand Up @@ -3098,6 +3075,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
Owned<IThreadPool> threadPool;
IntArray subSizes;
IAgentContext &agent;
IContextLogger &contextLogger;

void addFile(IDistributedFile &f)
{
Expand All @@ -3108,7 +3086,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
for (unsigned idx = 0; idx < numParts; idx++)
{
IDistributedFilePart *part = f.getPart(idx);
parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent));
parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent, contextLogger));
}
keyFiles.append(OLINK(f));
tlks.append(*f.getPart(numParts));
Expand All @@ -3118,8 +3096,8 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
public:
IMPLEMENT_IINTERFACE;

DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
: owner(_owner), file(f), agent(_agent)
DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, IContextLogger & _contextLogger)
: owner(_owner), file(f), agent(_agent), contextLogger(_contextLogger)
{
threadPool.setown(createThreadPool("hthor keyed join lookup thread pool", &threadFactory));
IDistributedSuperFile *super = f->querySuperFile();
Expand Down Expand Up @@ -3184,7 +3162,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
//Owned<IRecordLayoutTranslator>
trans.setown(owner.getLayoutTranslator(&f));
owner.verifyIndex(&f, index, trans);
Owned<IKeyManager> manager = createLocalKeyManager(owner.queryIndexRecord(), index, NULL, owner.hasNewSegmentMonitors(), false);
Owned<IKeyManager> manager = createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false);
managers.append(*manager.getLink());
}
opened = true;
Expand Down Expand Up @@ -3213,8 +3191,8 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
const IDynamicTransform * queryRecordLayoutTranslator() const { return trans; }
};

KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent)
: ThreadedPartHandler<MatchSet>(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk)
KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, IContextLogger & _contextLogger)
: ThreadedPartHandler<MatchSet>(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk), contextLogger(_contextLogger)
{
}

Expand All @@ -3223,7 +3201,7 @@ void KeyedLookupPartHandler::openPart()
if(manager)
return;
Owned<IKeyIndex> index = openKeyFile(*part);
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, NULL, owner.hasNewSegmentMonitors(), false));
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false));
const IDynamicTransform * trans = tlk->queryRecordLayoutTranslator();
if(trans && !index->isTopLevelKey())
manager->setLayoutTranslator(trans);
Expand All @@ -3238,13 +3216,14 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
IJoinProcessor &owner;
IAgentContext &agent;
bool opened;
IContextLogger &contextLogger;

public:
IMPLEMENT_IINTERFACE;


MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
: file(f), owner(_owner), agent(_agent), opened(false)
MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, IContextLogger & _contextLogger)
: file(f), owner(_owner), agent(_agent), opened(false), contextLogger(_contextLogger)
{
super = f->querySuperFile();
if (super)
Expand Down Expand Up @@ -3303,7 +3282,7 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
{
Owned<IKeyIndex> index = openKeyFile(f.queryPart(0));
owner.verifyIndex(&f, index, trans);
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, NULL, owner.hasNewSegmentMonitors(), false));
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false));
}
else
{
Expand All @@ -3316,7 +3295,7 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
parts->addIndex(index.getLink());
}
owner.verifyIndex(&f, index, trans);
manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, nullptr, owner.hasNewSegmentMonitors(), false));
manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, &contextLogger, owner.hasNewSegmentMonitors(), false));
}
if(trans)
manager->setLayoutTranslator(trans);
Expand Down Expand Up @@ -3412,9 +3391,6 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
RelaxedAtomic<unsigned> prefiltered;
RelaxedAtomic<unsigned> postfiltered;
RelaxedAtomic<unsigned> skips;
unsigned seeks;
unsigned scans;
unsigned wildseeks;
OwnedRowArray extractedRows;
Owned <ILocalOrDistributedFile> ldFile;
IDistributedFile * dFile;
Expand All @@ -3426,15 +3402,15 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
Owned<const IDynamicTransform> translator;
RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
bool isCodeSigned = false;
CStatsContextLogger contextLogger;

public:
CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, _arg.queryDiskRecordSize(), _node), helper(_arg)
: CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, _arg.queryDiskRecordSize(), _node), helper(_arg), contextLogger(jhtreeCacheStatistics)
{
prefiltered = 0;
postfiltered = 0;
skips = 0;
seeks = 0;
scans = 0;
eclKeySize.set(helper.queryIndexRecordSize());
if (_node)
{
Expand Down Expand Up @@ -3990,9 +3966,9 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
mono = useMonolithic(*dFile);
}
if (mono)
lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent));
lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent, contextLogger));
else
lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent));
lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent, contextLogger));
agent.logFileAccess(dFile, "HThor", "READ", graph);
}
else
Expand All @@ -4015,10 +3991,6 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
virtual void doneManager(IKeyManager * manager)
{
manager->releaseSegmentMonitors();
CriticalBlock b(statsCrit);
seeks += manager->querySeeks();
scans += manager->queryScans();
wildseeks += manager->queryWildSeeks();
}

virtual bool addMatch(MatchSet * ms, IKeyManager * manager)
Expand Down Expand Up @@ -4089,9 +4061,7 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
progress.addStatistic(StNumPreFiltered, prefiltered);
progress.addStatistic(StNumPostFiltered, postfiltered);
progress.addStatistic(StNumIndexSkips, skips);
progress.addStatistic(StNumIndexSeeks, seeks);
progress.addStatistic(StNumIndexScans, scans);
progress.addStatistic(StNumIndexWildSeeks, wildseeks);
contextLogger.recordStatistics(progress);
}

protected:
Expand Down
4 changes: 0 additions & 4 deletions fs/dafsclient/rmtfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2587,10 +2587,6 @@ class CRemoteKey : public CSimpleInterfaceOf<IIndexLookup>
pending = true;
return prefetchBuffer.queryRow();
}
virtual unsigned querySeeks() const override { return 0; } // not sure how best to handle these, perhaps should log/record somewhere on server-side
virtual unsigned queryScans() const override { return 0; }
virtual unsigned querySkips() const override { return 0; }
virtual unsigned queryWildSeeks() const override { return 0; }
};


Expand Down
Loading