Skip to content

Commit

Permalink
Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Nov 22, 2023
1 parent 6fd995b commit c3b17fc
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 126 deletions.
30 changes: 26 additions & 4 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWr
extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads)
{
StringBuffer clusterName;
// Should really specify the cluster number too, but this is the best we can do for now
f->getClusterName(0, clusterName);
return money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, numDiskReads));
}
Expand Down Expand Up @@ -4949,10 +4950,19 @@ protected: friend class CDistributedFilePart;
double sizeGB = getDiskSize(true, false) / ((double)1024 * 1024 * 1024);
const IPropertyTree *attrs = root->queryPropTree("Attr");
__int64 numDiskWrites = 0, numDiskReads = 0;
cost_type readCost = 0;
bool recalcReadCost = false;
if (attrs)
{
numDiskWrites = attrs->getPropInt64("@numDiskWrites");
numDiskReads = attrs->getPropInt64("@numDiskReads");
readCost = attrs->getPropInt64("@readCost", MaxStatisticValue);
if (readCost==MaxStatisticValue)
{
// backward compatibility: no read cost field so use numDiskReads to calc cost
numDiskReads = attrs->getPropInt64("@numDiskReads");
readCost = 0; // will need to recalculate
recalcReadCost = true;
}
}
if (isEmptyString(cluster))
{
Expand All @@ -4961,15 +4971,20 @@ protected: friend class CDistributedFilePart;
for (unsigned i = 0; i < countClusters; i++)
{
double tmpAtRestcost, tmpAccessCost;
calcFileCost(clusterNames[i], sizeGB, fileAgeDays, numDiskWrites, numDiskReads, tmpAtRestcost, tmpAccessCost);
calcFileCost(clusterNames[i], sizeGB, fileAgeDays, numDiskWrites, 0, tmpAtRestcost, tmpAccessCost);
if (recalcReadCost)
readCost += money2cost_type(calcFileAccessCost(clusterNames[i], 0, numDiskReads));
atRestCost += tmpAtRestcost;
accessCost += tmpAccessCost;
}
}
else
{
if (recalcReadCost)
readCost += cost_type2money(calcFileAccessCost(cluster, 0, numDiskReads));
calcFileCost(cluster, sizeGB, fileAgeDays, numDiskWrites, numDiskReads, atRestCost, accessCost);
}
accessCost += cost_type2money(readCost);
}
};

Expand Down Expand Up @@ -13330,7 +13345,7 @@ IDFProtectedIterator *CDistributedFileDirectory::lookupProtectedFiles(const char
const char* DFUQResultFieldNames[] = { "@name", "@description", "@group", "@kind", "@modified", "@job", "@owner",
"@DFUSFrecordCount", "@recordCount", "@recordSize", "@DFUSFsize", "@size", "@workunit", "@DFUSFcluster", "@numsubfiles",
"@accessed", "@numparts", "@compressedSize", "@directory", "@partmask", "@superowners", "@persistent", "@protect", "@compressed",
"@cost", "@numDiskReads", "@numDiskWrites", "@atRestCost", "@accessCost", "@maxSkew", "@minSkew", "@maxSkewPart", "@minSkewPart" };
"@cost", "@numDiskReads", "@numDiskWrites", "@atRestCost", "@accessCost", "@maxSkew", "@minSkew", "@maxSkewPart", "@minSkewPart", "@readCost" };

extern da_decl const char* getDFUQResultFieldName(DFUQResultField feild)
{
Expand Down Expand Up @@ -13416,9 +13431,16 @@ IPropertyTreeIterator *deserializeFileAttrIterator(MemoryBuffer& mb, unsigned nu
sizeDiskSize = file->getPropInt64(getDFUQResultFieldName(DFUQRForigsize), 0);
double sizeGB = sizeDiskSize / ((double)1024 * 1024 * 1024);
__int64 numDiskWrites = file->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
__int64 numDiskReads = file->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), 0);
__int64 readCost = file->getPropInt64("@readCost", MaxStatisticValue);
__int64 numDiskReads = 0;
if (readCost==MaxStatisticValue)
{ // backward compatibility: no read cost field so use numDiskReads to calc cost
readCost = 0;
numDiskReads = file->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), 0);
}
double atRestCost, accessCost;
calcFileCost(nodeGroup, sizeGB, fileAgeDays, numDiskWrites, numDiskReads, atRestCost, accessCost);
accessCost += cost_type2money(readCost);
file->setPropReal(getDFUQResultFieldName(DFUQRFcost), atRestCost+accessCost);
file->setPropReal(getDFUQResultFieldName(DFUQRFatRestCost), atRestCost);
file->setPropReal(getDFUQResultFieldName(DFUQRFaccessCost), accessCost);
Expand Down
1 change: 1 addition & 0 deletions dali/base/dadfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ enum DFUQResultField
DFUQRFmaxSkewPart = 31,
DFUQRFminSkewPart = 32,
DFUQRFterm = 33,
DFUQRFreadCost = 34,
DFUQRFreverse = 256,
DFUQRFnocase = 512,
DFUQRFnumeric = 1024,
Expand Down
10 changes: 10 additions & 0 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3771,7 +3771,17 @@ void FileSprayer::updateTargetProperties()
if (distributedSource)
{
if (distributedSource->querySuperFile()==nullptr)
{
IPropertyTree & fileAttr = distributedSource->queryAttributes();
cost_type prevReadCost = 0;
// Check if it is a legacy file without @readCost attribute -> calculate prevReadCost
if (!fileAttr.hasProp("@readCost") && fileAttr.hasProp("@numDiskReads"))
prevReadCost = calcFileAccessCost(distributedSource, 0, fileAttr.getPropInt64("@numDiskReads", 0));

cost_type readCost = calcFileAccessCost(distributedSource, 0, totalNumReads);
distributedSource->addAttrValue("@readCost", prevReadCost+readCost);
distributedSource->addAttrValue("@numDiskReads", totalNumReads);
}
}
if (error)
throw error.getClear();
Expand Down
12 changes: 11 additions & 1 deletion ecl/hthor/hthor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8539,10 +8539,20 @@ void CHThorDiskReadBaseActivity::closepart()
dFile = &(super->querySubFile(subfile, true));
}
}
dFile->addAttrValue("@numDiskReads", curDiskReads);
StringBuffer clusterName;
dFile->getClusterName(0, clusterName);
cost_type previousReadCost = 0;
IPropertyTree & fileAttr = dFile->queryAttributes();
// Check if it is a legacy file without @readCost attribute -> calculate readCost and update file attributes (use previousReadCost to update file with full read costs )
if (!fileAttr.hasProp("@readCost") && fileAttr.hasProp("@numDiskReads"))
{
stat_type prevDiskReads = fileAttr.getPropInt64("@numDiskReads", 0);
previousReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads));
}

diskAccessCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads));
dFile->addAttrValue("@numDiskReads", curDiskReads);
dFile->addAttrValue("@readCost", previousReadCost+diskAccessCost);
}
numDiskReads += curDiskReads;
}
Expand Down
4 changes: 2 additions & 2 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,9 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface
virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const
{
if (keyCursor)
keyCursor->mergeStats(targetStats);
keyCursor->mergeStats(targetStats); // merge IO stats
if (stats.ctx)
targetStats.merge(stats.ctx->queryStats());
targetStats.merge(stats.ctx->queryStats()); // merge jhtree cache stats
}
};

Expand Down
51 changes: 25 additions & 26 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
if (!keyManager)
throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?");
needsBlobCleaning = true;
return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger);
return (byte *) keyManager->loadBlob(id, dummy, nullptr);
}
void prepareManager(IKeyManager *_keyManager)
{
Expand Down Expand Up @@ -296,7 +296,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
part.queryOwner().getClusterLabel(0, planeName);
blockedSize = getBlockedFileIOSize(planeName);
}
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize));
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadFileStatistics, blockedSize));

RemoteFilename rfn;
part.getFilename(0, rfn);
Expand Down Expand Up @@ -348,44 +348,43 @@ class CIndexReadSlaveBase : public CSlaveActivity
else
return nullptr;
}
void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO)
{
if (!currentManager)
return;
if (fileStats.size()>0)
{
ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor();
if (superFDesc)
{
unsigned subfile, lnum;
if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum))
currentManager->mergeStats(*fileStats[fileTableStart+subfile]);
}
else
{
currentManager->mergeStats(*fileStats[fileTableStart]);
}
}
}
void updateStats()
{
// Merge jhtree cache stats and file io stats for current part
// NB: updateStats() should always be called whilst ioStatsCS is held.
// NB: caller ensures that currentManager must be valid
if (lazyIFileIO)
{
// merge stats for activity
mergeStats(inactiveStats, lazyIFileIO);
currentManager->mergeStats(inactiveStats);
// merge stats for part
if (currentPart<partDescs.ordinality())
mergeFileStats(&partDescs.item(currentPart), lazyIFileIO);
{
assertex(fileStats.size());
IPartDescriptor & curPartDesc = partDescs.item(currentPart);
ISuperFileDescriptor * superFDesc = curPartDesc.queryOwner().querySuperFileDescriptor();
if (superFDesc)
{
unsigned subfile, lnum;
if(superFDesc->mapSubPart(curPartDesc.queryPartIndex(), subfile, lnum))
currentManager->mergeStats(*fileStats[fileTableStart+subfile]);
}
else
{
currentManager->mergeStats(*fileStats[fileTableStart]);
}
}
}
}
void configureNextInput()
{
if (currentManager)
{
resetManager(currentManager);
currentManager = nullptr;

CriticalBlock b(ioStatsCS);
updateStats();
resetManager(currentManager);
currentManager = nullptr;
lazyIFileIO.clear();
}
IKeyManager *keyManager = nullptr;
Expand Down Expand Up @@ -688,7 +687,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
}
}
data.read(fileTableStart);
setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics);
setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadFileStatistics);
}
}
// IThorDataLink
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/keyedjoin/thkeyedjoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class CKeyedJoinMaster : public CMasterActivity
totalIndexParts = 0;

Owned<IDistributedFile> dataFile;
Owned<IDistributedFile> indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadActivityStatistics, &indexFileStatsTableEntry);
Owned<IDistributedFile> indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadFileStatistics, &indexFileStatsTableEntry);
if (indexFile)
{
if (!isFileKey(indexFile))
Expand Down
Loading

0 comments on commit c3b17fc

Please sign in to comment.