Skip to content

Commit

Permalink
HPCC-32031 Generate summary information in workunit to speed up file …
Browse files Browse the repository at this point in the history
…list operations

Signed-off-by: Richard Chapman <[email protected]>
  • Loading branch information
richardkchapman committed Jun 19, 2024
1 parent 88fa27e commit 40dc79d
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 26 deletions.
50 changes: 49 additions & 1 deletion common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4437,6 +4437,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
{ return c->getFileAccessCost(); }
virtual cost_type getCompileCost() const
{ return c->getCompileCost(); }
virtual bool getSummary(SummaryType type, SummaryMap &map) const override
{ return c->getSummary(type, map); }
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
{ return c->import(wuTree, graphProgressTree); }

Expand Down Expand Up @@ -4503,6 +4505,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
{ c->setUser(value); }
virtual void setWuScope(const char * value)
{ c->setWuScope(value); }
virtual void setSummary(SummaryType type, const SummaryMap &map) override
{ c->setSummary(type, map); }
virtual IWorkflowItem* addWorkflowItem(unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor)
{ return c->addWorkflowItem(wfid, type, mode, success, failure, recovery, retriesAllowed, contingencyFor); }
virtual void syncRuntimeWorkflow(IWorkflowItemArray * array)
Expand Down Expand Up @@ -8721,6 +8725,46 @@ void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool
}
}

static const char *summaryTypeName(SummaryType type)
{
switch (type)
{
case SummaryType::ReadFile: return "ReadFile";
case SummaryType::ReadIndex: return "ReadIndex";
case SummaryType::WriteFile: return "WriteFile";
case SummaryType::WriteIndex: return "WriteIndex";
case SummaryType::SpillFile: return "SpillFile";
case SummaryType::Service: return "Service";
default:
throwUnexpected();
}
};

bool CLocalWorkUnit::getSummary(SummaryType type, SummaryMap &map) const
{
VStringBuffer xpath("Summaries/%s", summaryTypeName(type));
CriticalBlock block(crit);
IPropertyTree *list = p->queryPropTree(xpath);
if (!list)
return false;
Owned<IPropertyTreeIterator> r = list->getElements("v");
ForEach(*r)
{
const char *val = r->query().queryProp(".");
map[val] = true;
}
return true;
}

void CLocalWorkUnit::setSummary(SummaryType type, const SummaryMap &map)
{
CriticalBlock block(crit);
IPropertyTree *summaries = ensurePTree(p, "Summaries");
IPropertyTree *list = ensurePTree(summaries, summaryTypeName(type));
for (const auto& [key, value] : map)
list->addProp("v", key.c_str());
}

void CLocalWorkUnit::setDebugValueInt(const char *propname, int value, bool overwrite)
{
StringBuffer lower;
Expand Down Expand Up @@ -13980,6 +14024,11 @@ extern WORKUNIT_API void descheduleWorkunit(char const * wuid)
doDescheduleWorkkunit(wuid);
}

extern WORKUNIT_API void addWorkunitSummary(IWorkUnit * wu, SummaryType summaryType, SummaryMap &map)
{
wu->setSummary(summaryType, map);
}

extern WORKUNIT_API void updateWorkunitStat(IWorkUnit * wu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * description, unsigned __int64 value, unsigned wfid)
{
StringBuffer scopestr;
Expand Down Expand Up @@ -14008,7 +14057,6 @@ class WuTimingUpdater : implements ITimeReportInfo
StatisticKind kind;
};


extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *timer)
{
WuTimingUpdater target(wu, SSTsection, StTimeTotalExecute);
Expand Down
19 changes: 19 additions & 0 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <vector>
#include <list>
#include <utility>
#include <map>
#include <string>

#define LEGACY_GLOBAL_SCOPE "workunit"
Expand Down Expand Up @@ -1179,6 +1180,20 @@ interface IConstWUScopeIterator : extends IScmIterator
//---------------------------------------------------------------------------------------------------------------------
//! IWorkUnit
//! Provides high level access to WorkUnit "header" data.

enum class SummaryType
{
First,
ReadFile = First,
ReadIndex,
WriteFile,
WriteIndex,
SpillFile,
Service,
NumItems
};
typedef std::map<std::string, bool> SummaryMap;

interface IWorkUnit;
interface IUserDescriptor;

Expand Down Expand Up @@ -1267,6 +1282,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
virtual unsigned queryFileUsage(const char * filename) const = 0;
virtual IConstWUFileUsageIterator * getFieldUsage() const = 0;
virtual bool getFieldUsageArray(StringArray & filenames, StringArray & columnnames, const char * clusterName) const = 0;
virtual bool getSummary(SummaryType type, SummaryMap &result) const = 0;

virtual unsigned getCodeVersion() const = 0;
virtual unsigned getWuidVersion() const = 0;
Expand Down Expand Up @@ -1400,6 +1416,7 @@ interface IWorkUnit : extends IConstWorkUnit
virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val) = 0;
virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend) = 0;
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree = nullptr) = 0;
virtual void setSummary(SummaryType type, const SummaryMap &map) = 0;
virtual IConstWorkUnit * unlock() = 0;
};

Expand Down Expand Up @@ -1722,6 +1739,8 @@ extern WORKUNIT_API void gatherLibraryNames(StringArray &names, StringArray &unr
//If we add any more parameters we should consider returning an object that can be updated
extern WORKUNIT_API void associateLocalFile(IWUQuery * query, WUFileType type, const char * name, const char * description, unsigned crc, unsigned minActivity=0, unsigned maxActivity=0);

extern WORKUNIT_API void addWorkunitSummary(IWorkUnit * wu, SummaryType summaryType, SummaryMap &map);

interface ITimeReporter;
extern WORKUNIT_API void updateWorkunitStat(IWorkUnit * wu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * description, unsigned __int64 value, unsigned wfid=0);
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *timer);
Expand Down
3 changes: 3 additions & 0 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ public:
void setTimeScheduled(const IJlibDateTime &val);
virtual void subscribe(WUSubscribeOptions options) {};

virtual bool getSummary(SummaryType type, SummaryMap &map) const override;
virtual void setSummary(SummaryType type, const SummaryMap &map) override;

// ILocalWorkUnit - used for debugging etc
void loadXML(const char *xml);
void serialize(MemoryBuffer &tgt);
Expand Down
12 changes: 6 additions & 6 deletions ecl/hqlcpp/hqlckey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ void HqlCppTranslator::buildKeyedJoinExtra(ActivityInstance & instance, IHqlExpr

//virtual const char * getFileName() = 0; // Returns filename of raw file fpos'es refer into
if (info->isFullJoin())
buildFilenameFunction(instance, instance.createctx, WaFilename, "getFileName", info->queryFileFilename(), hasDynamicFilename(info->queryFile()));
buildFilenameFunction(instance, instance.createctx, WaFilename, "getFileName", info->queryFileFilename(), hasDynamicFilename(info->queryFile()), SummaryType::ReadFile);

//virtual bool diskAccessRequired() = 0;
if (info->isFullJoin())
Expand Down Expand Up @@ -1229,7 +1229,7 @@ void HqlCppTranslator::buildKeyJoinIndexReadHelper(ActivityInstance & instance,
info->buildExtractIndexReadFields(instance.startctx);

//virtual const char * getIndexFileName() = 0;
buildFilenameFunction(instance, instance.startctx, WaIndexname, "getIndexFileName", info->queryKeyFilename(), hasDynamicFilename(info->queryKey()));
buildFilenameFunction(instance, instance.startctx, WaIndexname, "getIndexFileName", info->queryKeyFilename(), hasDynamicFilename(info->queryKey()), SummaryType::ReadIndex);

//virtual IOutputMetaData * queryIndexRecordSize() = 0;
LinkedHqlExpr indexExpr = info->queryOriginalKey();
Expand Down Expand Up @@ -1489,7 +1489,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityKeyedDistribute(BuildCtx & ctx
doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1);

//virtual const char * getIndexFileName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaIndexname, "getIndexFileName", keyFilename, dynamic);
buildFilenameFunction(*instance, instance->startctx, WaIndexname, "getIndexFileName", keyFilename, dynamic, SummaryType::ReadIndex);

//virtual IOutputMetaData * queryIndexRecordSize() = 0;
LinkedHqlExpr indexExpr = info.queryRawKey();
Expand Down Expand Up @@ -1583,7 +1583,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityKeyDiff(BuildCtx & ctx, IHqlEx
noteAllFieldsUsed(updated);

//virtual const char * getOutputName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr));
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr), SummaryType::WriteFile);

//virtual int getSequence() = 0;
doBuildSequenceFunc(instance->classctx, querySequence(expr), false);
Expand Down Expand Up @@ -1626,10 +1626,10 @@ ABoundActivity * HqlCppTranslator::doBuildActivityKeyPatch(BuildCtx & ctx, IHqlE
noteAllFieldsUsed(original);

//virtual const char * getPatchName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaPatchFilename, "getPatchName", patch, true);
buildFilenameFunction(*instance, instance->startctx, WaPatchFilename, "getPatchName", patch, true, SummaryType::ReadFile);

//virtual const char * getOutputName() = 0;
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr));
buildFilenameFunction(*instance, instance->startctx, WaOutputFilename, "getOutputName", output, hasDynamicFilename(expr), SummaryType::WriteIndex);

//virtual int getSequence() = 0;
doBuildSequenceFunc(instance->classctx, querySequence(expr), false);
Expand Down
14 changes: 11 additions & 3 deletions ecl/hqlcpp/hqlcpp.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1886,8 +1886,8 @@ public:
void doBuildFunctionReturn(BuildCtx & ctx, ITypeInfo * type, IHqlExpression * value);
void doBuildUserFunctionReturn(BuildCtx & ctx, ITypeInfo * type, IHqlExpression * value);

void addFilenameConstructorParameter(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr);
void buildFilenameFunction(ActivityInstance & instance, BuildCtx & classctx, WuAttr attr, const char * name, IHqlExpression * expr, bool isDynamic);
void addFilenameConstructorParameter(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr, SummaryType summaryType);
void buildFilenameFunction(ActivityInstance & instance, BuildCtx & classctx, WuAttr attr, const char * name, IHqlExpression * expr, bool isDynamic, SummaryType summaryType);
void buildRefFilenameFunction(ActivityInstance & instance, BuildCtx & classctx, WuAttr attr, const char * name, IHqlExpression * dataset);
void createAccessFunctions(StringBuffer & helperFunc, BuildCtx & declarectx, unsigned prio, const char * interfaceName, const char * object);

Expand All @@ -1911,7 +1911,7 @@ protected:
void buildIteratorNext(BuildCtx & ctx, IHqlExpression * iter, IHqlExpression * row);
bool shouldEvaluateSelectAsAlias(BuildCtx & ctx, IHqlExpression * expr);
IWUResult * createWorkunitResult(int sequence, IHqlExpression * nameExpr);
void noteFilename(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr, bool isDynamic);
void noteFilename(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr, bool isDynamic, SummaryType summaryType);
bool checkGetResultContext(BuildCtx & ctx, IHqlExpression * expr, CHqlBoundExpr & tgt);
void buildGetResultInfo(BuildCtx & ctx, IHqlExpression * expr, CHqlBoundExpr * boundTarget, const CHqlBoundTarget * targetAssign);
void buildGetResultSetInfo(BuildCtx & ctx, IHqlExpression * expr, CHqlBoundExpr * boundTarget, const CHqlBoundTarget * targetAssign);
Expand Down Expand Up @@ -2038,6 +2038,7 @@ protected:
bool isNeverDistributed(IHqlExpression * expr);

void ensureWorkUnitUpdated();
void addWorkunitSummaries();
bool getDebugFlag(const char * name, bool defValue);
void initOptions();
void postProcessOptions();
Expand Down Expand Up @@ -2140,6 +2141,13 @@ protected:
Owned<ITimeReporter> timeReporter;
CIArrayOf<SourceFieldUsage> trackedSources;
HqlExprArray tracedActivities;

// These are used to generate workunit summary info, to avoid having to walk the xgmml to get it
SummaryMap summaries[(int) SummaryType::NumItems];
void noteSummaryInfo(const char *name, SummaryType type)
{
summaries[(int) type][name] = true;
}
};


Expand Down
35 changes: 23 additions & 12 deletions ecl/hqlcpp/hqlhtcpp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3382,21 +3382,21 @@ void HqlCppTranslator::doBuildFunction(BuildCtx & ctx, ITypeInfo * type, const c
}
}

void HqlCppTranslator::addFilenameConstructorParameter(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr)
void HqlCppTranslator::addFilenameConstructorParameter(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr, SummaryType summaryType)
{
OwnedHqlExpr folded = foldHqlExpression(expr);
instance.addConstructorParameter(folded);
noteFilename(instance, attr, folded, false);
noteFilename(instance, attr, folded, false, summaryType);
}

void HqlCppTranslator::buildFilenameFunction(ActivityInstance & instance, BuildCtx & classctx, WuAttr attr, const char * name, IHqlExpression * expr, bool isDynamic)
void HqlCppTranslator::buildFilenameFunction(ActivityInstance & instance, BuildCtx & classctx, WuAttr attr, const char * name, IHqlExpression * expr, bool isDynamic, SummaryType summaryType)
{
OwnedHqlExpr folded = foldHqlExpression(expr);
doBuildVarStringFunction(classctx, name, folded);
noteFilename(instance, attr, folded, isDynamic);
noteFilename(instance, attr, folded, isDynamic, summaryType);
}

void HqlCppTranslator::noteFilename(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr, bool isDynamic)
void HqlCppTranslator::noteFilename(ActivityInstance & instance, WuAttr attr, IHqlExpression * expr, bool isDynamic, SummaryType summaryType)
{
if (options.addFilesnamesToGraph)
{
Expand All @@ -3417,6 +3417,7 @@ void HqlCppTranslator::noteFilename(ActivityInstance & instance, WuAttr attr, IH
StringBuffer propValue;
folded->queryValue()->getStringValue(propValue);
instance.addAttribute(attr, propValue);
noteSummaryInfo(propValue, summaryType);
}
}
if (isDynamic)
Expand Down Expand Up @@ -3459,20 +3460,24 @@ void HqlCppTranslator::buildRefFilenameFunction(ActivityInstance & instance, Bui
assertex(table);

IHqlExpression * filename = NULL;
SummaryType summaryType = SummaryType::ReadFile;
switch (table->getOperator())
{
case no_keyindex:
filename = table->queryChild(2);
summaryType = SummaryType::ReadIndex;
break;
case no_newkeyindex:
filename = table->queryChild(3);
summaryType = SummaryType::ReadIndex;
break;
case no_table:
filename = table->queryChild(0);
summaryType = SummaryType::ReadFile;
break;
}

buildFilenameFunction(instance, classctx, attr, name, filename, hasDynamicFilename(table));
buildFilenameFunction(instance, classctx, attr, name, filename, hasDynamicFilename(table), summaryType);
}

void HqlCppTranslator::buildConnectInputOutput(BuildCtx & ctx, ActivityInstance * instance, ABoundActivity * table, unsigned outputIndex, unsigned inputIndex, const char * label, bool nWay)
Expand Down Expand Up @@ -6236,12 +6241,17 @@ bool HqlCppTranslator::buildCpp(IHqlCppInstance & _code, HqlQueryContext & query
ensureWorkUnitUpdated();
throw;
}
addWorkunitSummaries();
ensureWorkUnitUpdated();


return true;
}

void HqlCppTranslator::addWorkunitSummaries()
{
for (int i = (int) SummaryType::First; i < (int) SummaryType::NumItems; i++)
addWorkunitSummary(wu(), (SummaryType) i, summaries[i]);
}

void HqlCppTranslator::ensureWorkUnitUpdated()
{
if (timeReporter)
Expand Down Expand Up @@ -10659,7 +10669,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutputIndex(BuildCtx & ctx, IH
buildInstancePrefix(instance);

//virtual const char * getFileName() { return "x.d00"; }
buildFilenameFunction(*instance, instance->startctx, WaFilename, "getFileName", filename, hasDynamicFilename(expr));
buildFilenameFunction(*instance, instance->startctx, WaFilename, "getFileName", filename, hasDynamicFilename(expr), SummaryType::WriteIndex);

//virtual unsigned getFlags() = 0;
IHqlExpression * updateAttr = expr->queryAttribute(updateAtom);
Expand Down Expand Up @@ -10710,7 +10720,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutputIndex(BuildCtx & ctx, IH

IHqlExpression * indexNameAttr = expr->queryAttribute(indexAtom);
if (indexNameAttr)
buildFilenameFunction(*instance, instance->startctx, WaDistributeIndexname, "getDistributeIndexName", indexNameAttr->queryChild(0), hasDynamicFilename(expr));
buildFilenameFunction(*instance, instance->startctx, WaDistributeIndexname, "getDistributeIndexName", indexNameAttr->queryChild(0), hasDynamicFilename(expr), SummaryType::ReadIndex);

buildExpiryHelper(instance->createctx, expr->queryAttribute(expireAtom));
buildUpdateHelper(instance->createctx, *instance, dataset, updateAttr);
Expand Down Expand Up @@ -11061,7 +11071,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutput(BuildCtx & ctx, IHqlExp
if (filename && filename->getOperator() != no_pipe)
{
bool isDynamic = expr->hasAttribute(resultAtom) || hasDynamicFilename(expr);
buildFilenameFunction(*instance, instance->startctx, WaFilename, "getFileName", filename, isDynamic);
buildFilenameFunction(*instance, instance->startctx, WaFilename, "getFileName", filename, isDynamic, SummaryType::WriteFile);
if (!filename->isConstant())
constFilename = false;
}
Expand Down Expand Up @@ -11163,7 +11173,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutput(BuildCtx & ctx, IHqlExp
{
assertex(tempCount.get() && !hasDynamic(expr));
instance->addConstructorParameter(tempCount);
addFilenameConstructorParameter(*instance, WaFilename, filename);
addFilenameConstructorParameter(*instance, WaFilename, filename, SummaryType::WriteFile);
}

instance->addSignedAttribute(expr->queryAttribute(_signed_Atom));
Expand Down Expand Up @@ -18050,6 +18060,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivitySOAP(BuildCtx & ctx, IHqlExpre
StringBuffer serviceName;
getUTF8Value(serviceName, service);
instance->addAttribute(WaServiceName, serviceName);
noteSummaryInfo(serviceName, SummaryType::Service);
}

enum class ReqFormat { NONE, XML, JSON, FORM_ENCODED };
Expand Down
3 changes: 2 additions & 1 deletion ecl/hqlcpp/hqlres.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,8 @@ bool ResourceManager::flush(StringBuffer &filename, const char *basename, bool f
}
fwrite(s.data.get(), 1, s.data.length(), bin);
fclose(bin);
fprintf(f, " .size %s,%u\n", label.str(), (unsigned)s.data.length());
if (!generateClang)
fprintf(f, " .size %s,%u\n", label.str(), (unsigned)s.data.length());
}
fclose(f);
#endif
Expand Down
Loading

0 comments on commit 40dc79d

Please sign in to comment.