Skip to content

Commit

Permalink
HPCC-30937 Update readCost, writeCost, numDiskReads and numDiskWrites…
Browse files Browse the repository at this point in the history
… periodically

Previously, readCost, writeCost, numDiskReads and numDiskWrites was
updated to the workunit at the end of the activity.  This meant that
users were unable to view these stats while the activity was running.
Furthermore, as these costs were calculated at the activity, the
guillotine was also not always effective until the end of an activity
(i.e activities that used a lot of disk access to exceed the guillotine
threshold substantially before being terminated.)

This commit updates readCost, writeCost, numDiskReads and numDiskWrites
periodically (with every serialization of the stats to master).  The
updates to file properties will still be updated at the end of the
activity to avoid creating extra load on dali.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Dec 21, 2023
1 parent c04f44b commit d50ad91
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 41 deletions.
10 changes: 10 additions & 0 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWri
return calcFileAccessCost(clusterName, numDiskWrites, numDiskReads);
}

extern da_decl cost_type calcDiskWriteCost(const StringArray & clusters, stat_type numDiskWrites)
{
if (!numDiskWrites)
return 0;
cost_type writeCost = 0;
ForEachItemIn(idx, clusters)
writeCost += money2cost_type(calcFileAccessCost(clusters.item(idx), numDiskWrites, 0));
return writeCost;
}

RemoteFilename &constructPartFilename(IGroup *grp,unsigned partno,unsigned partmax,const char *name,const char *partmask,const char *partdir,unsigned copy,ClusterPartDiskMapSpec &mspec,RemoteFilename &rfn)
{
partno--;
Expand Down
1 change: 1 addition & 0 deletions dali/base/dadfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,7 @@ extern da_decl bool checkLogicalName(const char *lfn,IUserDescriptor *user,bool
extern da_decl double calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays);
extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads);
extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads);
extern da_decl cost_type calcDiskWriteCost(const StringArray & clusters, stat_type numDiskWrites);
constexpr bool defaultPrivilegedUser = true;
constexpr bool defaultNonPrivilegedUser = false;

Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/fetch/thfetch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,16 @@ class CFetchActivityMaster : public CMasterActivity
for (unsigned i=0; i<numFilesToRead; i++)
fileStats[i]->deserialize(node, mb);
}
virtual void getActivityStats(IStatisticGatherer & stats) override
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcFileReadCostStats(false);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}
virtual void done() override
{
updateFileReadCostStats();
diskAccessCost = calcFileReadCostStats(true);
CMasterActivity::done();
}
};
Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/indexread/thindexread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,16 @@ class CIndexReadBase : public CMasterActivity
for (unsigned i=0; i<numFilesToRead; i++)
fileStats[i]->deserialize(node, mb);
}
virtual void getActivityStats(IStatisticGatherer & stats) override
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcFileReadCostStats(false);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}
virtual void done() override
{
updateFileReadCostStats();
diskAccessCost = calcFileReadCostStats(true);
CMasterActivity::done();
}
};
Expand Down
6 changes: 5 additions & 1 deletion thorlcr/activities/indexwrite/thindexwrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ class IndexWriteActivityMaster : public CMasterActivity
props.setPropInt64("@totalCRC", totalCRC);
}
props.setPropInt("@formatCrc", helper->getFormatCrc());
props.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), statsCollection.getStatisticSum(StNumDiskWrites));
props.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), diskAccessCost);
if (isLocal)
{
props.setPropBool("@local", true);
Expand All @@ -310,7 +312,6 @@ class IndexWriteActivityMaster : public CMasterActivity
bloom->setProp("@bloomProbability", pval.str());
}
container.queryTempHandler()->registerFile(fileName, container.queryOwner().queryGraphId(), 0, false, WUFileStandard, &clusters);
updateFileWriteCostStats(*fileDesc, props, statsCollection.getStatisticSum(StNumDiskWrites));
if (!dlfn.isExternal())
queryThorFileManager().publish(container.queryJob(), fileName, *fileDesc);
}
Expand Down Expand Up @@ -418,6 +419,9 @@ class IndexWriteActivityMaster : public CMasterActivity
{
CMasterActivity::getActivityStats(stats);
stats.addStatistic(StNumDuplicateKeys, cummulativeDuplicateKeyCount);
diskAccessCost = calcDiskWriteCost(clusters, statsCollection.getStatisticSum(StNumDiskWrites));
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}
};

Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/keyedjoin/thkeyedjoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,16 @@ class CKeyedJoinMaster : public CMasterActivity
for (unsigned i=0; i<numFilesToRead; i++)
fileStats[i]->deserialize(node, mb);
}
virtual void getActivityStats(IStatisticGatherer & stats) override
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcFileReadCostStats(false);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}
virtual void done() override
{
updateFileReadCostStats();
diskAccessCost = calcFileReadCostStats(true);
CMasterActivity::done();
}
};
Expand Down
21 changes: 19 additions & 2 deletions thorlcr/activities/thdiskbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,17 @@ void CDiskReadMasterBase::serializeSlaveData(MemoryBuffer &dst, unsigned slave)
CSlavePartMapping::serializeNullMap(dst);
}

void CDiskReadMasterBase::getActivityStats(IStatisticGatherer & stats)
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcFileReadCostStats(false);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}

void CDiskReadMasterBase::done()
{
updateFileReadCostStats();
diskAccessCost = calcFileReadCostStats(true);
CMasterActivity::done();
}

Expand Down Expand Up @@ -284,7 +292,8 @@ void CWriteMasterBase::publish()
}
if (TDWrestricted & diskHelperBase->getFlags())
props.setPropBool("restricted", true );
updateFileWriteCostStats(*fileDesc, props, statsCollection.getStatisticSum(StNumDiskWrites));
props.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), statsCollection.getStatisticSum(StNumDiskWrites));
props.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), diskAccessCost);
container.queryTempHandler()->registerFile(fileName, container.queryOwner().queryGraphId(), diskHelperBase->getTempUsageCount(), TDXtemporary & diskHelperBase->getFlags(), getDiskOutputKind(diskHelperBase->getFlags()), &clusters);
if (!dlfn.isExternal())
{
Expand Down Expand Up @@ -432,6 +441,14 @@ void CWriteMasterBase::done()
}
}

void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats)
{
CMasterActivity::getActivityStats(stats);
diskAccessCost = calcDiskWriteCost(clusters, statsCollection.getStatisticSum(StNumDiskWrites));
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}

void CWriteMasterBase::slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
{
if (mb.length()) // if 0 implies aborted out from this slave.
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/activities/thdiskbase.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public:
virtual void init() override;
virtual void kill() override;
virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override;
virtual void getActivityStats(IStatisticGatherer & stats) override;
virtual void done() override;
virtual void validateFile(IDistributedFile *file) { }
virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override;
Expand All @@ -62,6 +63,7 @@ public:
virtual void preStart(size32_t parentExtractSz, const byte *parentExtract);
virtual void init();
virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave);
virtual void getActivityStats(IStatisticGatherer & stats) override;
virtual void done();
virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb);
};
Expand Down
59 changes: 26 additions & 33 deletions thorlcr/graph/thgraphmaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,6 @@ void CMasterActivity::deserializeStats(unsigned node, MemoryBuffer &mb)
void CMasterActivity::getActivityStats(IStatisticGatherer & stats)
{
statsCollection.getStats(stats);
if (diskAccessCost)
stats.addStatistic(StCostFileAccess, diskAccessCost);
}

void CMasterActivity::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
Expand Down Expand Up @@ -647,22 +645,18 @@ void CMasterActivity::done()
}
}

void CMasterActivity::updateFileReadCostStats()
// calcFileReadCostStats calculates and returns the read costs for all files read by the activity
// In addition, if updateFileProps==true, it updates the file attributes with @readCost and @numDiskReads
// Note: should be called once per activity with "updateFileProps==true" to avoid double counting
cost_type CMasterActivity::calcFileReadCostStats(bool updateFileProps)
{
// Update numDiskReads & readCost in the file attributes and return readCost
auto updateReadCosts = [](bool useJhtreeCacheStats, IDistributedFile *file, CThorStatsCollection &stats)
// Returns updates numDiskReads & readCost in the file attributes and returns the readCost
auto updateReadCosts = [updateFileProps](bool useJhtreeCacheStats, IDistributedFile *file, CThorStatsCollection &stats)
{
StringBuffer clusterName;
file->getClusterName(0, clusterName);
IPropertyTree & fileAttr = file->queryAttributes();
cost_type legacyReadCost = 0, curReadCost = 0;
// Legacy files will not have the readCost stored as an attribute
if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads)))
{
// Legacy file: calculate readCost using prev disk reads and new disk reads
stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
legacyReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads));
}
cost_type curReadCost = 0;
stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads);
if(useJhtreeCacheStats)
{
Expand All @@ -673,11 +667,23 @@ void CMasterActivity::updateFileReadCostStats()
}
else
curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads));
file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost);
file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads);

if (updateFileProps)
{
cost_type legacyReadCost = 0;
// Legacy files will not have the readCost stored as an attribute
if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads)))
{
// Legacy file: calculate readCost using prev disk reads and new disk reads
stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
legacyReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads));
}
file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost);
file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads);
}
return curReadCost;
};

cost_type readCost = 0;
if (fileStats.size()>0)
{
ThorActivityKind activityKind = container.getKind();
Expand All @@ -700,13 +706,13 @@ void CMasterActivity::updateFileReadCostStats()
for (unsigned i=0; i<numSubFiles; i++)
{
IDistributedFile &subFile = super->querySubFile(i, true);
diskAccessCost += updateReadCosts(useJhtreeCache, &subFile, *fileStats[fileIndex]);
readCost += updateReadCosts(useJhtreeCache, &subFile, *fileStats[fileIndex]);
fileIndex++;
}
}
else
{
diskAccessCost += updateReadCosts(useJhtreeCache, file, *fileStats[fileIndex]);
readCost += updateReadCosts(useJhtreeCache, file, *fileStats[fileIndex]);
fileIndex++;
}
}
Expand All @@ -716,22 +722,9 @@ void CMasterActivity::updateFileReadCostStats()
{
IDistributedFile *file = queryReadFile(0);
if (file)
diskAccessCost += updateReadCosts(true, file, statsCollection);
}
}

void CMasterActivity::updateFileWriteCostStats(IFileDescriptor & fileDesc, IPropertyTree &props, stat_type numDiskWrites)
{
if (numDiskWrites)
{
props.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), numDiskWrites);
assertex(fileDesc.numClusters()>=1);
StringBuffer clusterName;
fileDesc.getClusterGroupName(0, clusterName);// Note: calculating for 1st cluster. (Future: calc for >1 clusters)
cost_type writeCost = money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, 0));
props.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost);
diskAccessCost = writeCost;
readCost = updateReadCosts(true, file, statsCollection);
}
return readCost;
}

//////////////////////
Expand Down
3 changes: 1 addition & 2 deletions thorlcr/graph/thgraphmaster.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ protected:
unsigned queryReadFileId(const char *lfnName);
IDistributedFile *findReadFile(const char *lfnName);
IDistributedFile *lookupReadFile(const char *lfnName, AccessMode mode, bool jobTemp, bool temp, bool opt, bool statsForMultipleFiles=false, const StatisticsMapping &statsMapping=diskReadRemoteStatistics, unsigned * fileStatsStartEntry=nullptr);
void updateFileReadCostStats();
void updateFileWriteCostStats(IFileDescriptor & fileDesc, IPropertyTree &props, stat_type numDiskWrites);
cost_type calcFileReadCostStats(bool updateFileProps);
virtual void process() { }
public:
IMPLEMENT_IINTERFACE_USING(CActivityBase)
Expand Down

0 comments on commit d50ad91

Please sign in to comment.