Skip to content

Commit

Permalink
Merge pull request #18785 from richardkchapman/summaries
Browse files Browse the repository at this point in the history
HPCC-32031 Generate summary information in workunit to speed up file list operations

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jun 27, 2024
2 parents a3b036a + bcdbc49 commit 3c70c0d
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 50 deletions.
87 changes: 62 additions & 25 deletions common/pkgfiles/referencedfilelist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,34 +965,15 @@ void ReferencedFileList::addFilesFromPackageMap(IPropertyTree *pm)

bool ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg)
{
Owned<IConstWUGraphIterator> graphs = &cw->getGraphs(GraphTypeActivities);
ForEach(*graphs)
SummaryMap files;
if (cw->getSummary(SummaryType::ReadFile, files) &&
cw->getSummary(SummaryType::ReadIndex, files))
{
Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false, false);
Owned<IPropertyTreeIterator> iter = xgmml->getElements("//node[att/@name='_*ileName']");
ForEach(*iter)
for (const auto& [lName, summaryFlags] : files)
{
IPropertyTree &node = iter->query();
bool isOpt = false;
const char *logicalName = node.queryProp("att[@name='_fileName']/@value");
if (!logicalName)
logicalName = node.queryProp("att[@name='_indexFileName']/@value");
if (!logicalName)
continue;

isOpt = node.getPropBool("att[@name='_isIndexOpt']/@value");
if (!isOpt)
isOpt = node.getPropBool("att[@name='_isOpt']/@value");

ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
//not likely to be part of roxie queries, but for forward compatibility:
if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
continue;
if (node.getPropBool("att[@name='_isSpill']/@value") ||
node.getPropBool("att[@name='_isTransformSpill']/@value"))
continue;
const char *logicalName = lName.c_str();
StringArray subfileNames;
unsigned flags = isOpt ? RefFileOptional : RefFileNotOptional;
unsigned flags = (summaryFlags & SummaryFlags::IsOpt) ? RefFileOptional : RefFileNotOptional;
if (pkg)
{
const char *pkgid = pkg->locateSuperFile(logicalName);
Expand All @@ -1018,6 +999,62 @@ bool ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackag
ensureFile(logicalName, flags, NULL, false, &subfileNames);
}
}
else
{
Owned<IConstWUGraphIterator> graphs = &cw->getGraphs(GraphTypeActivities);
ForEach(*graphs)
{
Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false, false);
Owned<IPropertyTreeIterator> iter = xgmml->getElements("//node[att/@name='_*ileName']");
ForEach(*iter)
{
IPropertyTree &node = iter->query();
bool isOpt = false;
const char *logicalName = node.queryProp("att[@name='_fileName']/@value");
if (!logicalName)
logicalName = node.queryProp("att[@name='_indexFileName']/@value");
if (!logicalName)
continue;

isOpt = node.getPropBool("att[@name='_isIndexOpt']/@value");
if (!isOpt)
isOpt = node.getPropBool("att[@name='_isOpt']/@value");

ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
//not likely to be part of roxie queries, but for forward compatibility:
if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
continue;
if (node.getPropBool("att[@name='_isSpill']/@value") ||
node.getPropBool("att[@name='_isTransformSpill']/@value"))
continue;
StringArray subfileNames;
unsigned flags = isOpt ? RefFileOptional : RefFileNotOptional;
if (pkg)
{
const char *pkgid = pkg->locateSuperFile(logicalName);
if (pkgid)
{
flags |= (RefFileSuper | RefFileInPackage);
Owned<ISimpleSuperFileEnquiry> ssfe = pkg->resolveSuperFile(logicalName);
if (ssfe && ssfe->numSubFiles()>0)
{
unsigned count = ssfe->numSubFiles();
while (count--)
{
StringBuffer subfile;
ssfe->getSubFileName(count, subfile);
ensureFile(subfile, RefSubFile | RefFileInPackage, pkgid, false, nullptr);
subfileNames.append(subfile);
}
}
}
ensureFile(logicalName, flags, pkgid, pkg->isCompulsory(), &subfileNames);
}
else
ensureFile(logicalName, flags, NULL, false, &subfileNames);
}
}
}
return pkg ? pkg->isCompulsory() : false;
}

Expand Down
69 changes: 68 additions & 1 deletion common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4437,6 +4437,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
{ return c->getFileAccessCost(); }
virtual cost_type getCompileCost() const
{ return c->getCompileCost(); }
virtual bool getSummary(SummaryType type, SummaryMap &map) const override
{ return c->getSummary(type, map); }
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
{ return c->import(wuTree, graphProgressTree); }

Expand Down Expand Up @@ -4503,6 +4505,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
{ c->setUser(value); }
virtual void setWuScope(const char * value)
{ c->setWuScope(value); }
virtual void setSummary(SummaryType type, const SummaryMap &map) override
{ c->setSummary(type, map); }
virtual IWorkflowItem* addWorkflowItem(unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor)
{ return c->addWorkflowItem(wfid, type, mode, success, failure, recovery, retriesAllowed, contingencyFor); }
virtual void syncRuntimeWorkflow(IWorkflowItemArray * array)
Expand Down Expand Up @@ -8721,6 +8725,65 @@ void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool
}
}

static const char *summaryTypeName(SummaryType type)
{
switch (type)
{
case SummaryType::ReadFile: return "ReadFile";
case SummaryType::ReadIndex: return "ReadIndex";
case SummaryType::WriteFile: return "WriteFile";
case SummaryType::WriteIndex: return "WriteIndex";
case SummaryType::PersistFile: return "PersistFile";
case SummaryType::SpillFile: return "SpillFile";
case SummaryType::JobTemp: return "JobTemp";
case SummaryType::Service: return "Service";
default:
throwUnexpected();
}
};

bool CLocalWorkUnit::getSummary(SummaryType type, SummaryMap &map) const
{
VStringBuffer xpath("Summaries/%s", summaryTypeName(type));
CriticalBlock block(crit);
const char *list = p->queryProp(xpath);
if (!list)
return false;
StringArray s;
s.appendList(list, "\n");
ForEachItemIn(idx, s)
{
const char *name = s.item(idx);
if (name && *name)
{
char *end = nullptr;
SummaryFlags flags = (SummaryFlags) strtol(name, &end, 16);
if (*end!=':')
return false; // unrecognized format
name = end+1;
if (map.find(name) == map.end())
map[name] = flags;
else
map[name] = map[name] & flags;
}
}
return true;
}

void CLocalWorkUnit::setSummary(SummaryType type, const SummaryMap &map)
{
StringBuffer list;
for (const auto& [name, flags] : map)
{
if (list.length())
list.append('\n');
list.appendf("%01x:%s", (unsigned) flags, name.c_str());
}
CriticalBlock block(crit);
IPropertyTree *summaries = ensurePTree(p, "Summaries");
summaries->setProp(summaryTypeName(type), list);
}

void CLocalWorkUnit::setDebugValueInt(const char *propname, int value, bool overwrite)
{
StringBuffer lower;
Expand Down Expand Up @@ -13980,6 +14043,11 @@ extern WORKUNIT_API void descheduleWorkunit(char const * wuid)
doDescheduleWorkkunit(wuid);
}

extern WORKUNIT_API void addWorkunitSummary(IWorkUnit * wu, SummaryType summaryType, SummaryMap &map)
{
wu->setSummary(summaryType, map);
}

extern WORKUNIT_API void updateWorkunitStat(IWorkUnit * wu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * description, unsigned __int64 value, unsigned wfid)
{
StringBuffer scopestr;
Expand Down Expand Up @@ -14008,7 +14076,6 @@ class WuTimingUpdater : implements ITimeReportInfo
StatisticKind kind;
};


extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *timer)
{
WuTimingUpdater target(wu, SSTsection, StTimeTotalExecute);
Expand Down
39 changes: 39 additions & 0 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <vector>
#include <list>
#include <utility>
#include <map>
#include <string>

#define LEGACY_GLOBAL_SCOPE "workunit"
Expand Down Expand Up @@ -1179,6 +1180,40 @@ interface IConstWUScopeIterator : extends IScmIterator
//---------------------------------------------------------------------------------------------------------------------
//! IWorkUnit
//! Provides high level access to WorkUnit "header" data.

// Be sure to update summaryTypeName in workunit.cpp if adding anything here
enum class SummaryType
{
First,
ReadFile = First,
ReadIndex,
WriteFile,
WriteIndex,
PersistFile,
SpillFile,
JobTemp,
Service,
// Keep these at the end
NumItems,
None = NumItems
};

enum SummaryFlags : byte
{
None = 0,
IsOpt = 0x01,
IsSigned = 0x02,
};
BITMASK_ENUM(SummaryFlags);

struct ncasecomp {
bool operator() (const std::string& lhs, const std::string& rhs) const {
return stricmp(lhs.c_str(), rhs.c_str()) < 0;
}
};

typedef std::map<std::string, SummaryFlags, ncasecomp> SummaryMap;

interface IWorkUnit;
interface IUserDescriptor;

Expand Down Expand Up @@ -1267,6 +1302,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
virtual unsigned queryFileUsage(const char * filename) const = 0;
virtual IConstWUFileUsageIterator * getFieldUsage() const = 0;
virtual bool getFieldUsageArray(StringArray & filenames, StringArray & columnnames, const char * clusterName) const = 0;
virtual bool getSummary(SummaryType type, SummaryMap &result) const = 0;

virtual unsigned getCodeVersion() const = 0;
virtual unsigned getWuidVersion() const = 0;
Expand Down Expand Up @@ -1400,6 +1436,7 @@ interface IWorkUnit : extends IConstWorkUnit
virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val) = 0;
virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend) = 0;
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree = nullptr) = 0;
virtual void setSummary(SummaryType type, const SummaryMap &map) = 0;
virtual IConstWorkUnit * unlock() = 0;
};

Expand Down Expand Up @@ -1722,6 +1759,8 @@ extern WORKUNIT_API void gatherLibraryNames(StringArray &names, StringArray &unr
//If we add any more parameters we should consider returning an object that can be updated
extern WORKUNIT_API void associateLocalFile(IWUQuery * query, WUFileType type, const char * name, const char * description, unsigned crc, unsigned minActivity=0, unsigned maxActivity=0);

extern WORKUNIT_API void addWorkunitSummary(IWorkUnit * wu, SummaryType summaryType, SummaryMap &map);

interface ITimeReporter;
extern WORKUNIT_API void updateWorkunitStat(IWorkUnit * wu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * description, unsigned __int64 value, unsigned wfid=0);
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *timer);
Expand Down
3 changes: 3 additions & 0 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ public:
void setTimeScheduled(const IJlibDateTime &val);
virtual void subscribe(WUSubscribeOptions options) {};

virtual bool getSummary(SummaryType type, SummaryMap &map) const override;
virtual void setSummary(SummaryType type, const SummaryMap &map) override;

// ILocalWorkUnit - used for debugging etc
void loadXML(const char *xml);
void serialize(MemoryBuffer &tgt);
Expand Down
14 changes: 8 additions & 6 deletions ecl/hqlcpp/hqlckey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class KeyedJoinInfo : public CInterface
bool needToExtractJoinFields() const { return extractJoinFieldsTransform != NULL; }
bool hasPostFilter() const { return monitors->queryExtraFilter() || fileFilter; }
bool requireActivityForKey() const { return hasComplexIndex; }
bool isKeySigned() { return key->hasAttribute(_signed_Atom); }
bool isFileSigned() { return file && file->hasAttribute(_signed_Atom); }

void reportFailureReason(IHqlExpression * cond) { monitors->reportFailureReason(cond); }
bool useValueSets() const { return createValueSets; }
Expand Down Expand Up @@ -1192,7 +1194,7 @@ void HqlCppTranslator::buildKeyedJoinExtra(ActivityInstance & instance, IHqlExpr

//virtual const char * getFileName() = 0; // Returns filename of raw file fpos'es refer into
if (info->isFullJoin())
buildFilenameFunction(instance, instance.createctx, WaFilename, "getFileName", info->queryFileFilename(), hasDynamicFilename(info->queryFile()));
buildFilenameFunction(instance, instance.createctx, WaFilename, "getFileName", info->queryFileFilename(), hasDynamicFilename(info->queryFile()), SummaryType::ReadFile, info->isKeyOpt(), info->isFileSigned());

//virtual bool diskAccessRequired() = 0;
if (info->isFullJoin())
Expand Down Expand Up @@ -1229,7 +1231,7 @@ void HqlCppTranslator::buildKeyJoinIndexReadHelper(ActivityInstance & instance,
info->buildExtractIndexReadFields(instance.startctx);

//virtual const char * getIndexFileName() = 0;
buildFilenameFunction(instance, instance.startctx, WaIndexname, "getIndexFileName", info->queryKeyFilename(), hasDynamicFilename(info->queryKey()));
buildFilenameFunction(instance, instance.startctx, WaIndexname, "getIndexFileName", info->queryKeyFilename(), hasDynamicFilename(info->queryKey()), SummaryType::ReadIndex, info->isKeyOpt(), info->isKeySigned());

//virtual IOutputMetaData * queryIndexRecordSize() = 0;
LinkedHqlExpr indexExpr = info->queryOriginalKey();
Expand Down Expand Up @@ -1489,7 +1491,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityKeyedDistribute(BuildCtx & ctx
doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1);

//virtual const char * getIndexFileName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaIndexname, "getIndexFileName", keyFilename, dynamic);
buildFilenameFunction(*instance, instance->startctx, WaIndexname, "getIndexFileName", keyFilename, dynamic, SummaryType::ReadIndex, info.isKeyOpt(), info.isKeySigned());

//virtual IOutputMetaData * queryIndexRecordSize() = 0;
LinkedHqlExpr indexExpr = info.queryRawKey();
Expand Down Expand Up @@ -1583,7 +1585,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityKeyDiff(BuildCtx & ctx, IHqlEx
noteAllFieldsUsed(updated);

//virtual const char * getOutputName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr));
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr), SummaryType::WriteFile, false, expr->hasAttribute(_signed_Atom));

//virtual int getSequence() = 0;
doBuildSequenceFunc(instance->classctx, querySequence(expr), false);
Expand Down Expand Up @@ -1626,10 +1628,10 @@ ABoundActivity * HqlCppTranslator::doBuildActivityKeyPatch(BuildCtx & ctx, IHqlE
noteAllFieldsUsed(original);

//virtual const char * getPatchName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaPatchFilename, "getPatchName", patch, true);
buildFilenameFunction(*instance, instance->startctx, WaPatchFilename, "getPatchName", patch, true, SummaryType::ReadFile, false, false);

//virtual const char * getOutputName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr));
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr), SummaryType::WriteIndex, false, false);

//virtual int getSequence() = 0;
doBuildSequenceFunc(instance->classctx, querySequence(expr), false);
Expand Down
Loading

0 comments on commit 3c70c0d

Please sign in to comment.