Skip to content

Commit

Permalink
HPCC-32031 Generate summary information in workunit to speed up file …
Browse files Browse the repository at this point in the history
…list operations

Signed-off-by: Richard Chapman <[email protected]>
  • Loading branch information
richardkchapman committed Jun 26, 2024
1 parent a3b036a commit 342ff2b
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 49 deletions.
85 changes: 61 additions & 24 deletions common/pkgfiles/referencedfilelist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,32 +965,13 @@ void ReferencedFileList::addFilesFromPackageMap(IPropertyTree *pm)

bool ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg)
{
Owned<IConstWUGraphIterator> graphs = &cw->getGraphs(GraphTypeActivities);
ForEach(*graphs)
SummaryMap files;
if (cw->getSummary(SummaryType::ReadFile, files) &&
cw->getSummary(SummaryType::ReadIndex, files))
{
Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false, false);
Owned<IPropertyTreeIterator> iter = xgmml->getElements("//node[att/@name='_*ileName']");
ForEach(*iter)
for (const auto& [lName, isOpt] : files)
{
IPropertyTree &node = iter->query();
bool isOpt = false;
const char *logicalName = node.queryProp("att[@name='_fileName']/@value");
if (!logicalName)
logicalName = node.queryProp("att[@name='_indexFileName']/@value");
if (!logicalName)
continue;

isOpt = node.getPropBool("att[@name='_isIndexOpt']/@value");
if (!isOpt)
isOpt = node.getPropBool("att[@name='_isOpt']/@value");

ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
//not likely to be part of roxie queries, but for forward compatibility:
if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
continue;
if (node.getPropBool("att[@name='_isSpill']/@value") ||
node.getPropBool("att[@name='_isTransformSpill']/@value"))
continue;
const char *logicalName = lName.c_str();
StringArray subfileNames;
unsigned flags = isOpt ? RefFileOptional : RefFileNotOptional;
if (pkg)
Expand Down Expand Up @@ -1018,6 +999,62 @@ bool ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackag
ensureFile(logicalName, flags, NULL, false, &subfileNames);
}
}
else
{
Owned<IConstWUGraphIterator> graphs = &cw->getGraphs(GraphTypeActivities);
ForEach(*graphs)
{
Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false, false);
Owned<IPropertyTreeIterator> iter = xgmml->getElements("//node[att/@name='_*ileName']");
ForEach(*iter)
{
IPropertyTree &node = iter->query();
bool isOpt = false;
const char *logicalName = node.queryProp("att[@name='_fileName']/@value");
if (!logicalName)
logicalName = node.queryProp("att[@name='_indexFileName']/@value");
if (!logicalName)
continue;

isOpt = node.getPropBool("att[@name='_isIndexOpt']/@value");
if (!isOpt)
isOpt = node.getPropBool("att[@name='_isOpt']/@value");

ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
//not likely to be part of roxie queries, but for forward compatibility:
if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
continue;
if (node.getPropBool("att[@name='_isSpill']/@value") ||
node.getPropBool("att[@name='_isTransformSpill']/@value"))
continue;
StringArray subfileNames;
unsigned flags = isOpt ? RefFileOptional : RefFileNotOptional;
if (pkg)
{
const char *pkgid = pkg->locateSuperFile(logicalName);
if (pkgid)
{
flags |= (RefFileSuper | RefFileInPackage);
Owned<ISimpleSuperFileEnquiry> ssfe = pkg->resolveSuperFile(logicalName);
if (ssfe && ssfe->numSubFiles()>0)
{
unsigned count = ssfe->numSubFiles();
while (count--)
{
StringBuffer subfile;
ssfe->getSubFileName(count, subfile);
ensureFile(subfile, RefSubFile | RefFileInPackage, pkgid, false, nullptr);
subfileNames.append(subfile);
}
}
}
ensureFile(logicalName, flags, pkgid, pkg->isCompulsory(), &subfileNames);
}
else
ensureFile(logicalName, flags, NULL, false, &subfileNames);
}
}
}
return pkg ? pkg->isCompulsory() : false;
}

Expand Down
62 changes: 61 additions & 1 deletion common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4437,6 +4437,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
{ return c->getFileAccessCost(); }
virtual cost_type getCompileCost() const
{ return c->getCompileCost(); }
virtual bool getSummary(SummaryType type, SummaryMap &map) const override
{ return c->getSummary(type, map); }
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
{ return c->import(wuTree, graphProgressTree); }

Expand Down Expand Up @@ -4503,6 +4505,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
{ c->setUser(value); }
virtual void setWuScope(const char * value)
{ c->setWuScope(value); }
virtual void setSummary(SummaryType type, const SummaryMap &map) override
{ c->setSummary(type, map); }
virtual IWorkflowItem* addWorkflowItem(unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor)
{ return c->addWorkflowItem(wfid, type, mode, success, failure, recovery, retriesAllowed, contingencyFor); }
virtual void syncRuntimeWorkflow(IWorkflowItemArray * array)
Expand Down Expand Up @@ -8721,6 +8725,58 @@ void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool
}
}

static const char *summaryTypeName(SummaryType type)
{
switch (type)
{
case SummaryType::ReadFile: return "ReadFile";
case SummaryType::ReadIndex: return "ReadIndex";
case SummaryType::WriteFile: return "WriteFile";
case SummaryType::WriteIndex: return "WriteIndex";
case SummaryType::SpillFile: return "SpillFile";
case SummaryType::Service: return "Service";
default:
throwUnexpected();
}
};

bool CLocalWorkUnit::getSummary(SummaryType type, SummaryMap &map) const
{
VStringBuffer xpath("Summaries/%s", summaryTypeName(type));
CriticalBlock block(crit);
const char *list = p->queryProp(xpath);
if (!list)
return false;
StringArray s;
s.appendList(list, "\n");
ForEachItemIn(idx, s)
{
const char *name = s.item(idx);
if (name && *name)
{
bool isOpt = name[0]=='?';
name++;
if (!isOpt || map.find(name) == map.end())
map[name] = isOpt;
}
}
return true;
}

void CLocalWorkUnit::setSummary(SummaryType type, const SummaryMap &map)
{
StringBuffer list;
for (const auto& [name, isOpt] : map)
{
if (list.length())
list.append('\n');
list.appendf("%c%s", isOpt ? '?': '+', name.c_str());
}
CriticalBlock block(crit);
IPropertyTree *summaries = ensurePTree(p, "Summaries");
summaries->setProp(summaryTypeName(type), list);
}

void CLocalWorkUnit::setDebugValueInt(const char *propname, int value, bool overwrite)
{
StringBuffer lower;
Expand Down Expand Up @@ -13980,6 +14036,11 @@ extern WORKUNIT_API void descheduleWorkunit(char const * wuid)
doDescheduleWorkkunit(wuid);
}

extern WORKUNIT_API void addWorkunitSummary(IWorkUnit * wu, SummaryType summaryType, SummaryMap &map)
{
wu->setSummary(summaryType, map);
}

extern WORKUNIT_API void updateWorkunitStat(IWorkUnit * wu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * description, unsigned __int64 value, unsigned wfid)
{
StringBuffer scopestr;
Expand Down Expand Up @@ -14008,7 +14069,6 @@ class WuTimingUpdater : implements ITimeReportInfo
StatisticKind kind;
};


extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *timer)
{
WuTimingUpdater target(wu, SSTsection, StTimeTotalExecute);
Expand Down
27 changes: 27 additions & 0 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <vector>
#include <list>
#include <utility>
#include <map>
#include <string>

#define LEGACY_GLOBAL_SCOPE "workunit"
Expand Down Expand Up @@ -1179,6 +1180,28 @@ interface IConstWUScopeIterator : extends IScmIterator
//---------------------------------------------------------------------------------------------------------------------
//! IWorkUnit
//! Provides high level access to WorkUnit "header" data.

// Be sure to update summaryTypeName in workunit.cpp if adding anything here
enum class SummaryType
{
First,
ReadFile = First,
ReadIndex,
WriteFile,
WriteIndex,
SpillFile,
Service,
NumItems
};

struct ncasecomp {
bool operator() (const std::string& lhs, const std::string& rhs) const {
return stricmp(lhs.c_str(), rhs.c_str()) < 0;
}
};

typedef std::map<std::string, bool, ncasecomp> SummaryMap;

interface IWorkUnit;
interface IUserDescriptor;

Expand Down Expand Up @@ -1267,6 +1290,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
virtual unsigned queryFileUsage(const char * filename) const = 0;
virtual IConstWUFileUsageIterator * getFieldUsage() const = 0;
virtual bool getFieldUsageArray(StringArray & filenames, StringArray & columnnames, const char * clusterName) const = 0;
virtual bool getSummary(SummaryType type, SummaryMap &result) const = 0;

virtual unsigned getCodeVersion() const = 0;
virtual unsigned getWuidVersion() const = 0;
Expand Down Expand Up @@ -1400,6 +1424,7 @@ interface IWorkUnit : extends IConstWorkUnit
virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val) = 0;
virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend) = 0;
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree = nullptr) = 0;
virtual void setSummary(SummaryType type, const SummaryMap &map) = 0;
virtual IConstWorkUnit * unlock() = 0;
};

Expand Down Expand Up @@ -1722,6 +1747,8 @@ extern WORKUNIT_API void gatherLibraryNames(StringArray &names, StringArray &unr
//If we add any more parameters we should consider returning an object that can be updated
extern WORKUNIT_API void associateLocalFile(IWUQuery * query, WUFileType type, const char * name, const char * description, unsigned crc, unsigned minActivity=0, unsigned maxActivity=0);

extern WORKUNIT_API void addWorkunitSummary(IWorkUnit * wu, SummaryType summaryType, SummaryMap &map);

interface ITimeReporter;
extern WORKUNIT_API void updateWorkunitStat(IWorkUnit * wu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * description, unsigned __int64 value, unsigned wfid=0);
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *timer);
Expand Down
3 changes: 3 additions & 0 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ public:
void setTimeScheduled(const IJlibDateTime &val);
virtual void subscribe(WUSubscribeOptions options) {};

virtual bool getSummary(SummaryType type, SummaryMap &map) const override;
virtual void setSummary(SummaryType type, const SummaryMap &map) override;

// ILocalWorkUnit - used for debugging etc
void loadXML(const char *xml);
void serialize(MemoryBuffer &tgt);
Expand Down
12 changes: 6 additions & 6 deletions ecl/hqlcpp/hqlckey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ void HqlCppTranslator::buildKeyedJoinExtra(ActivityInstance & instance, IHqlExpr

//virtual const char * getFileName() = 0; // Returns filename of raw file fpos'es refer into
if (info->isFullJoin())
buildFilenameFunction(instance, instance.createctx, WaFilename, "getFileName", info->queryFileFilename(), hasDynamicFilename(info->queryFile()));
buildFilenameFunction(instance, instance.createctx, WaFilename, "getFileName", info->queryFileFilename(), hasDynamicFilename(info->queryFile()), SummaryType::ReadFile, info->isKeyOpt());

//virtual bool diskAccessRequired() = 0;
if (info->isFullJoin())
Expand Down Expand Up @@ -1229,7 +1229,7 @@ void HqlCppTranslator::buildKeyJoinIndexReadHelper(ActivityInstance & instance,
info->buildExtractIndexReadFields(instance.startctx);

//virtual const char * getIndexFileName() = 0;
buildFilenameFunction(instance, instance.startctx, WaIndexname, "getIndexFileName", info->queryKeyFilename(), hasDynamicFilename(info->queryKey()));
buildFilenameFunction(instance, instance.startctx, WaIndexname, "getIndexFileName", info->queryKeyFilename(), hasDynamicFilename(info->queryKey()), SummaryType::ReadIndex, info->isKeyOpt());

//virtual IOutputMetaData * queryIndexRecordSize() = 0;
LinkedHqlExpr indexExpr = info->queryOriginalKey();
Expand Down Expand Up @@ -1489,7 +1489,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityKeyedDistribute(BuildCtx & ctx
doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1);

//virtual const char * getIndexFileName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaIndexname, "getIndexFileName", keyFilename, dynamic);
buildFilenameFunction(*instance, instance->startctx, WaIndexname, "getIndexFileName", keyFilename, dynamic, SummaryType::ReadIndex, info.isKeyOpt());

//virtual IOutputMetaData * queryIndexRecordSize() = 0;
LinkedHqlExpr indexExpr = info.queryRawKey();
Expand Down Expand Up @@ -1583,7 +1583,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityKeyDiff(BuildCtx & ctx, IHqlEx
noteAllFieldsUsed(updated);

//virtual const char * getOutputName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr));
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr), SummaryType::WriteFile, false);

//virtual int getSequence() = 0;
doBuildSequenceFunc(instance->classctx, querySequence(expr), false);
Expand Down Expand Up @@ -1626,10 +1626,10 @@ ABoundActivity * HqlCppTranslator::doBuildActivityKeyPatch(BuildCtx & ctx, IHqlE
noteAllFieldsUsed(original);

//virtual const char * getPatchName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaPatchFilename, "getPatchName", patch, true);
buildFilenameFunction(*instance, instance->startctx, WaPatchFilename, "getPatchName", patch, true, SummaryType::ReadFile, false);

//virtual const char * getOutputName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr));
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr), SummaryType::WriteIndex, false);

//virtual int getSequence() = 0;
doBuildSequenceFunc(instance->classctx, querySequence(expr), false);
Expand Down
21 changes: 18 additions & 3 deletions ecl/hqlcpp/hqlcpp.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1886,8 +1886,8 @@ public:
void doBuildFunctionReturn(BuildCtx & ctx, ITypeInfo * type, IHqlExpression * value);
void doBuildUserFunctionReturn(BuildCtx & ctx, ITypeInfo * type, IHqlExpression * value);

void addFilenameConstructorParameter(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr);
void buildFilenameFunction(ActivityInstance & instance, BuildCtx & classctx, WuAttr attr, const char * name, IHqlExpression * expr, bool isDynamic);
void addFilenameConstructorParameter(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr, SummaryType summaryType);
void buildFilenameFunction(ActivityInstance & instance, BuildCtx & classctx, WuAttr attr, const char * name, IHqlExpression * expr, bool isDynamic, SummaryType summaryType, bool isOpt);
void buildRefFilenameFunction(ActivityInstance & instance, BuildCtx & classctx, WuAttr attr, const char * name, IHqlExpression * dataset);
void createAccessFunctions(StringBuffer & helperFunc, BuildCtx & declarectx, unsigned prio, const char * interfaceName, const char * object);

Expand All @@ -1911,7 +1911,7 @@ protected:
void buildIteratorNext(BuildCtx & ctx, IHqlExpression * iter, IHqlExpression * row);
bool shouldEvaluateSelectAsAlias(BuildCtx & ctx, IHqlExpression * expr);
IWUResult * createWorkunitResult(int sequence, IHqlExpression * nameExpr);
void noteFilename(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr, bool isDynamic);
void noteFilename(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr, bool isDynamic, SummaryType summaryType, bool isOpt);
bool checkGetResultContext(BuildCtx & ctx, IHqlExpression * expr, CHqlBoundExpr & tgt);
void buildGetResultInfo(BuildCtx & ctx, IHqlExpression * expr, CHqlBoundExpr * boundTarget, const CHqlBoundTarget * targetAssign);
void buildGetResultSetInfo(BuildCtx & ctx, IHqlExpression * expr, CHqlBoundExpr * boundTarget, const CHqlBoundTarget * targetAssign);
Expand Down Expand Up @@ -2038,6 +2038,7 @@ protected:
bool isNeverDistributed(IHqlExpression * expr);

void ensureWorkUnitUpdated();
void addWorkunitSummaries();
bool getDebugFlag(const char * name, bool defValue);
void initOptions();
void postProcessOptions();
Expand Down Expand Up @@ -2140,6 +2141,20 @@ protected:
Owned<ITimeReporter> timeReporter;
CIArrayOf<SourceFieldUsage> trackedSources;
HqlExprArray tracedActivities;

// These are used to generate workunit summary info, to avoid having to walk the xgmml to get it
SummaryMap summaries[(int) SummaryType::NumItems];
void noteSummaryInfo(const char *name, SummaryType type, bool isOpt)
{
SummaryMap &map = summaries[(int) type];
if (isOpt)
{
if (map.find(name) == map.end())
map[name] = true;
}
else
map[name] = false;
}
};


Expand Down
Loading

0 comments on commit 342ff2b

Please sign in to comment.