Skip to content

Commit

Permalink
HPCC-30911 Store read and write cost as file attributes
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>

Changes following review

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Nov 29, 2023
1 parent 3c870a7 commit 9e3b9f4
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 63 deletions.
97 changes: 64 additions & 33 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,41 +177,49 @@ static IPropertyTree *getEmptyAttr()
return createPTree("Attr");
}

extern da_decl void calcFileCost(const char * cluster, double sizeGB, double fileAgeDays, __int64 numDiskWrites, __int64 numDiskReads, double & atRestCost, double & accessCost)
static IPropertyTree *getCostPropTree(const char *cluster)
{
Owned<IPropertyTree> plane = getStoragePlane(cluster);
Owned<IPropertyTree> global;
IPropertyTree * costPT = nullptr;

if (plane && plane->hasProp("cost/@storageAtRest"))
{
costPT = plane->queryPropTree("cost");
return plane->getPropTree("cost");
}
else
{
global.setown(getGlobalConfig());
costPT = global->queryPropTree("cost");
return getGlobalConfigSP()->getPropTree("cost");
}
}

extern da_decl double calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays)
{
Owned<IPropertyTree> costPT = getCostPropTree(cluster);

if (costPT==nullptr)
{
atRestCost = 0.0;
accessCost = 0.0;
return;
}
constexpr int accessPriceScalingFactor = 10000; // read/write pricing based on 10,000 operations
return 0.0;
double atRestPrice = costPT->getPropReal("@storageAtRest", 0.0);
double readPrice = costPT->getPropReal("@storageReads", 0.0);
double writePrice = costPT->getPropReal("@storageWrites", 0.0);
double storageCostDaily = atRestPrice * 12 / 365;
atRestCost = storageCostDaily * sizeGB * fileAgeDays;
accessCost = (readPrice * numDiskReads / accessPriceScalingFactor) + (writePrice * numDiskWrites / accessPriceScalingFactor);
return storageCostDaily * sizeGB * fileAgeDays;
}

extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads)
{
double atRestCost, accessCost;
calcFileCost(cluster, 0, 0, numDiskWrites, numDiskReads, atRestCost, accessCost);
return accessCost;
Owned<IPropertyTree> costPT = getCostPropTree(cluster);

if (costPT==nullptr)
return 0.0;
constexpr int accessPriceScalingFactor = 10000; // read/write pricing based on 10,000 operations
double readPrice = costPT->getPropReal("@storageReads", 0.0);
double writePrice = costPT->getPropReal("@storageWrites", 0.0);
return (readPrice * numDiskReads / accessPriceScalingFactor) + (writePrice * numDiskWrites / accessPriceScalingFactor);
}

extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads)
{
StringBuffer clusterName;
// Should really specify the cluster number too, but this is the best we can do for now
f->getClusterName(0, clusterName);
return calcFileAccessCost(clusterName, numDiskWrites, numDiskReads);
}

RemoteFilename &constructPartFilename(IGroup *grp,unsigned partno,unsigned partmax,const char *name,const char *partmask,const char *partdir,unsigned copy,ClusterPartDiskMapSpec &mspec,RemoteFilename &rfn)
Expand Down Expand Up @@ -4941,27 +4949,39 @@ protected: friend class CDistributedFilePart;
double fileAgeDays = difftime(time(nullptr), dt.getSimple())/(24*60*60);
double sizeGB = getDiskSize(true, false) / ((double)1024 * 1024 * 1024);
const IPropertyTree *attrs = root->queryPropTree("Attr");
bool doLegacyAccessCostCalc = false;
__int64 numDiskWrites = 0, numDiskReads = 0;
if (attrs)
{
numDiskWrites = attrs->getPropInt64("@numDiskWrites");
numDiskReads = attrs->getPropInt64("@numDiskReads");
if (hasReadWriteCostFields(attrs))
{
// Newer files have readCost and writeCost attributes
accessCost = cost_type2money(attrs->getPropInt64(getDFUQResultFieldName(DFUQRFreadCost)) + attrs->getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost)));
}
else
{
// (Costs needs to be calculated from numDiskReads and numDiskWrites for legacy files)
numDiskWrites = attrs->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites));
numDiskReads = attrs->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads));
doLegacyAccessCostCalc = true;
}
}
if (isEmptyString(cluster))
{
StringArray clusterNames;
unsigned countClusters = getClusterNames(clusterNames);
for (unsigned i = 0; i < countClusters; i++)
{
double tmpAtRestcost, tmpAccessCost;
calcFileCost(clusterNames[i], sizeGB, fileAgeDays, numDiskWrites, numDiskReads, tmpAtRestcost, tmpAccessCost);
atRestCost += tmpAtRestcost;
accessCost += tmpAccessCost;
atRestCost += calcFileAtRestCost(clusterNames[i], sizeGB, fileAgeDays);
if (doLegacyAccessCostCalc)
accessCost += calcFileAccessCost(clusterNames[i], numDiskWrites, numDiskReads);
}
}
else
{
calcFileCost(cluster, sizeGB, fileAgeDays, numDiskWrites, numDiskReads, atRestCost, accessCost);
atRestCost += calcFileAtRestCost(cluster, sizeGB, fileAgeDays);
if (doLegacyAccessCostCalc)
accessCost += calcFileAccessCost(cluster, numDiskWrites, numDiskReads);
}
}
};
Expand Down Expand Up @@ -13340,11 +13360,12 @@ IDFProtectedIterator *CDistributedFileDirectory::lookupProtectedFiles(const char
const char* DFUQResultFieldNames[] = { "@name", "@description", "@group", "@kind", "@modified", "@job", "@owner",
"@DFUSFrecordCount", "@recordCount", "@recordSize", "@DFUSFsize", "@size", "@workunit", "@DFUSFcluster", "@numsubfiles",
"@accessed", "@numparts", "@compressedSize", "@directory", "@partmask", "@superowners", "@persistent", "@protect", "@compressed",
"@cost", "@numDiskReads", "@numDiskWrites", "@atRestCost", "@accessCost", "@maxSkew", "@minSkew", "@maxSkewPart", "@minSkewPart" };
"@cost", "@numDiskReads", "@numDiskWrites", "@atRestCost", "@accessCost", "@maxSkew", "@minSkew", "@maxSkewPart", "@minSkewPart",
"@readCost", "@writeCost" };

extern da_decl const char* getDFUQResultFieldName(DFUQResultField feild)
extern da_decl const char* getDFUQResultFieldName(DFUQResultField field)
{
return DFUQResultFieldNames[feild];
return DFUQResultFieldNames[field];
}

IPropertyTreeIterator *deserializeFileAttrIterator(MemoryBuffer& mb, unsigned numFiles, DFUQResultField* localFilters, const char* localFilterBuf)
Expand Down Expand Up @@ -13425,10 +13446,20 @@ IPropertyTreeIterator *deserializeFileAttrIterator(MemoryBuffer& mb, unsigned nu
else
sizeDiskSize = file->getPropInt64(getDFUQResultFieldName(DFUQRForigsize), 0);
double sizeGB = sizeDiskSize / ((double)1024 * 1024 * 1024);
__int64 numDiskWrites = file->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
__int64 numDiskReads = file->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), 0);
double atRestCost, accessCost;
calcFileCost(nodeGroup, sizeGB, fileAgeDays, numDiskWrites, numDiskReads, atRestCost, accessCost);
double accessCost = 0.0;
if (hasReadWriteCostFields(file))
{
// Newer files have readCost and writeCost attributes
accessCost = cost_type2money(file->getPropInt64(getDFUQResultFieldName(DFUQRFreadCost)) + file->getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost)));
}
else
{
// (Costs needs to be calculated from numDiskReads and numDiskWrites for legacy files)
__int64 numDiskWrites = file->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), 0);
__int64 numDiskReads = file->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
accessCost = calcFileAccessCost(nodeGroup, numDiskWrites, numDiskReads);
}
double atRestCost = calcFileAtRestCost(nodeGroup, sizeGB, fileAgeDays);
file->setPropReal(getDFUQResultFieldName(DFUQRFcost), atRestCost+accessCost);
file->setPropReal(getDFUQResultFieldName(DFUQRFatRestCost), atRestCost);
file->setPropReal(getDFUQResultFieldName(DFUQRFaccessCost), accessCost);
Expand Down
17 changes: 12 additions & 5 deletions dali/base/dadfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,17 @@ enum DFUQResultField
DFUQRFminSkew = 30,
DFUQRFmaxSkewPart = 31,
DFUQRFminSkewPart = 32,
DFUQRFterm = 33,
DFUQRFreadCost = 33,
DFUQRFwriteCost = 34,
DFUQRFterm = 35, // must be last in list
DFUQRFreverse = 256,
DFUQRFnocase = 512,
DFUQRFnumeric = 1024,
DFUQRFfloat = 2048
};

extern da_decl const char* getDFUQFilterFieldName(DFUQFilterField feild);
extern da_decl const char* getDFUQResultFieldName(DFUQResultField feild);
extern da_decl const char* getDFUQFilterFieldName(DFUQFilterField field);
extern da_decl const char* getDFUQResultFieldName(DFUQResultField field);

/**
* File operations can be included in a transaction to ensure that multiple
Expand Down Expand Up @@ -886,11 +888,16 @@ inline const char *queryFileKind(IFileDescriptor *f) { return queryFileKind(f->q
extern da_decl void ensureFileScope(const CDfsLogicalFileName &dlfn, unsigned timeoutms=INFINITE);

extern da_decl bool checkLogicalName(const char *lfn,IUserDescriptor *user,bool readreq,bool createreq,bool allowquery,const char *specialnotallowedmsg);
extern da_decl void calcFileCost(const char * cluster, double sizeGB, double fileAgeDays, __int64 numDiskWrites, __int64 numDiskReads, double & atRestCost, double & accessCost);

extern da_decl double calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays);
extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads);
extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads);
constexpr bool defaultPrivilegedUser = true;
constexpr bool defaultNonPrivilegedUser = false;

extern da_decl void configurePreferredPlanes();

inline bool hasReadWriteCostFields(const IPropertyTree *pt)
{
return pt->hasProp(getDFUQResultFieldName(DFUQRFreadCost)) || pt->hasProp(getDFUQResultFieldName(DFUQRFwriteCost));
}
#endif
21 changes: 19 additions & 2 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3592,7 +3592,10 @@ void FileSprayer::updateTargetProperties()

DistributedFilePropertyLock lock(distributedTarget);
IPropertyTree &curProps = lock.queryAttributes();
curProps.setPropInt64("@numDiskWrites", totalNumWrites);
cost_type writeCost = money2cost_type(calcFileAccessCost(distributedTarget, totalNumWrites, 0));
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites);

if (calcCRC())
curProps.setPropInt(FAcrc, totalCRC.get());
curProps.setPropInt64(FAsize, totalLength);
Expand Down Expand Up @@ -3771,7 +3774,21 @@ void FileSprayer::updateTargetProperties()
if (distributedSource)
{
if (distributedSource->querySuperFile()==nullptr)
distributedSource->addAttrValue("@numDiskReads", totalNumReads);
{
IPropertyTree & fileAttr = distributedSource->queryAttributes();
cost_type readCost = 0;
// Check if it is a legacy file without @readCost attribute -> calculate prevReadCost
if (!hasReadWriteCostFields(&fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads)))
{
stat_type totalReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0) + totalNumReads;
readCost = money2cost_type(calcFileAccessCost(distributedSource, 0, totalReads));
}
else
readCost = money2cost_type(calcFileAccessCost(distributedSource, 0, totalNumReads));

distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), readCost);
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), totalNumReads);
}
}
if (error)
throw error.getClear();
Expand Down
21 changes: 17 additions & 4 deletions ecl/hthor/hthor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ void CHThorDiskWriteActivity::publish()
if (helper.getFlags() & TDWrestricted)
properties.setPropBool("restricted", true);

properties.setPropInt64("@numDiskWrites", numDiskWrites);
properties.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), numDiskWrites);
StringBuffer lfn;
expandLogicalFilename(lfn, mangledHelperFileName.str(), agent.queryWorkUnit(), agent.queryResolveFilesLocally(), false);
CDfsLogicalFileName logicalName;
Expand All @@ -790,6 +790,7 @@ void CHThorDiskWriteActivity::publish()
StringBuffer clusterName;
file->getClusterName(0, clusterName);
diskAccessCost = money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, 0));
properties.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), diskAccessCost);
}
file->attach(logicalName.get(), agent.queryCodeContext()->queryUserDescriptor());
agent.logFileAccess(file, "HThor", "CREATED", graph);
Expand Down Expand Up @@ -1366,7 +1367,7 @@ void CHThorIndexWriteActivity::execute()
properties.setProp("@workunit", agent.queryWorkUnit()->queryWuid());
properties.setProp("@job", agent.queryWorkUnit()->queryJobName());
properties.setPropInt64("@duplicateKeyCount",duplicateKeyCount);
properties.setPropInt64("@numDiskWrites", numDiskWrites);
properties.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), numDiskWrites);
properties.setPropInt64("@numLeafNodes", numLeafNodes);
properties.setPropInt64("@numBranchNodes", numBranchNodes);
properties.setPropInt64("@numBlobNodes", numBlobNodes);
Expand Down Expand Up @@ -1437,6 +1438,7 @@ void CHThorIndexWriteActivity::execute()
StringBuffer clusterName;
dfile->getClusterName(0, clusterName);
diskAccessCost = money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, 0));
properties.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), diskAccessCost);
}
else
lfn = filename;
Expand Down Expand Up @@ -8539,10 +8541,21 @@ void CHThorDiskReadBaseActivity::closepart()
dFile = &(super->querySubFile(subfile, true));
}
}
dFile->addAttrValue("@numDiskReads", curDiskReads);
StringBuffer clusterName;
dFile->getClusterName(0, clusterName);
diskAccessCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads));
cost_type readCost = 0;
IPropertyTree & fileAttr = dFile->queryAttributes();
// Legacy files will not have the readCost stored as an attribute
if (!hasReadWriteCostFields(&fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads)))
{
// Legacy file: calculate readCost using prev disk reads and new disk reads
stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
readCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads+curDiskReads));
}
else
readCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads));
dFile->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), readCost);
dFile->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads);
}
numDiskReads += curDiskReads;
}
Expand Down
47 changes: 28 additions & 19 deletions thorlcr/graph/thgraphmaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,27 @@ void CMasterActivity::done()

void CMasterActivity::updateFileReadCostStats()
{
// Returns updates numDiskReads & readCost in the file attributes and returns the readCost
auto updateReadCosts = [](IDistributedFile *file, CThorStatsCollection &stats)
{
stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads);
StringBuffer clusterName;
file->getClusterName(0, clusterName);
cost_type legacyReadCost = 0, curReadCost = 0;
IPropertyTree & fileAttr = file->queryAttributes();
// Legacy files will not have the readCost stored as an attribute
if (!hasReadWriteCostFields(&fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads)))
{
// Legacy file: calculate readCost using prev disk reads and new disk reads
stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
legacyReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads));
}
curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads));
file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost);
file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads);
return curReadCost;
};

if (fileStats.size()>0)
{
unsigned fileIndex = 0;
Expand All @@ -664,21 +685,13 @@ void CMasterActivity::updateFileReadCostStats()
for (unsigned i=0; i<numSubFiles; i++)
{
IDistributedFile &subFile = super->querySubFile(i, true);
stat_type numDiskReads = fileStats[fileIndex]->getStatisticSum(StNumDiskReads);
StringBuffer clusterName;
subFile.getClusterName(0, clusterName);
diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
subFile.addAttrValue("@numDiskReads", numDiskReads);
diskAccessCost += updateReadCosts(&subFile, *fileStats[fileIndex]);
fileIndex++;
}
}
else
{
stat_type numDiskReads = fileStats[fileIndex]->getStatisticSum(StNumDiskReads);
StringBuffer clusterName;
file->getClusterName(0, clusterName);
diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
file->addAttrValue("@numDiskReads", numDiskReads);
diskAccessCost += updateReadCosts(file, *fileStats[fileIndex]);
fileIndex++;
}
}
Expand All @@ -688,25 +701,21 @@ void CMasterActivity::updateFileReadCostStats()
{
IDistributedFile *file = queryReadFile(0);
if (file)
{
stat_type numDiskReads = statsCollection.getStatisticSum(StNumDiskReads);
StringBuffer clusterName;
file->getClusterName(0, clusterName);
diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
file->addAttrValue("@numDiskReads", numDiskReads);
}
diskAccessCost += updateReadCosts(file, statsCollection);
}
}

void CMasterActivity::updateFileWriteCostStats(IFileDescriptor & fileDesc, IPropertyTree &props, stat_type numDiskWrites)
{
if (numDiskWrites)
{
props.setPropInt64("@numDiskWrites", numDiskWrites);
props.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), numDiskWrites);
assertex(fileDesc.numClusters()>=1);
StringBuffer clusterName;
fileDesc.getClusterGroupName(0, clusterName);// Note: calculating for 1st cluster. (Future: calc for >1 clusters)
diskAccessCost = money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, 0));
cost_type writeCost = money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, 0));
props.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost);
diskAccessCost += writeCost;
}
}

Expand Down

0 comments on commit 9e3b9f4

Please sign in to comment.