Skip to content

Commit

Permalink
Merge pull request #18157 from shamser/issue30974
Browse files Browse the repository at this point in the history
HPCC-30974 Copying super files via fileservices/dfu does not track read/cost stats

Reviewed-by: Jake Smith <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jan 19, 2024
2 parents 4b4122b + 831ba96 commit 969f6ef
Showing 1 changed file with 69 additions and 25 deletions.
94 changes: 69 additions & 25 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3384,25 +3384,9 @@ void FileSprayer::spray()
afterTransfer();

//If got here then we have succeeded
//Note: On failure, costs will not be updated. Future: would be useful to have a way to update costs on failure.
updateTargetProperties();

//Calculate and store file access cost
cost_type fileAccessCost = 0;
if (distributedTarget)
{
StringBuffer cluster;
distributedTarget->getClusterName(0, cluster);
if (!cluster.isEmpty())
fileAccessCost += calcFileAccessCost(cluster, totalNumWrites, 0);
}
if (distributedSource && distributedSource->querySuperFile()==nullptr)
{
StringBuffer cluster;
distributedSource->getClusterName(0, cluster);
if (!cluster.isEmpty())
fileAccessCost += calcFileAccessCost(cluster, 0, totalNumReads);
}
progressReport->setFileAccessCost(fileAccessCost);
StringBuffer copyEventText; // [logical-source] > [logical-target]
if (distributedSource)
copyEventText.append(distributedSource->queryLogicalName());
Expand Down Expand Up @@ -3455,6 +3439,7 @@ void FileSprayer::updateTargetProperties()
{
TimeSection timer("FileSprayer::updateTargetProperties() time");
Owned<IException> error;
cost_type totalWriteCost = 0;
if (distributedTarget)
{
StringBuffer failedParts;
Expand All @@ -3467,6 +3452,7 @@ void FileSprayer::updateTargetProperties()
bool sameSizeHeaderFooter = isSameSizeHeaderFooter();
bool sameSizeSourceTarget = (sources.ordinality() == distributedTarget->numParts());
offset_t partCompressedLength = 0;
IDistributedSuperFile * superTgt = distributedTarget->querySuperFile();

ForEachItemIn(idx, partition)
{
Expand Down Expand Up @@ -3585,15 +3571,37 @@ void FileSprayer::updateTargetProperties()
partLength = 0;
partCompressedLength = 0;
}

// Update @writeCost and @numWrites in subfile properties and update totalWriteCost
if (superTgt)
{
if (cur.whichOutput != (unsigned)-1)
{
unsigned targetPartNum = targets.item(cur.whichOutput).partNum;
IDistributedFile &subfile = superTgt->querySubFile(targetPartNum, true);
DistributedFilePropertyLock lock(&subfile);
IPropertyTree &subFileProps = lock.queryAttributes();
cost_type prevNumWrites = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites));
cost_type prevWriteCost = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost));
cost_type curWriteCost = calcFileAccessCost(&subfile, curProgress.numWrites, 0);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), prevWriteCost + curWriteCost);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), prevNumWrites + curProgress.numWrites);
totalWriteCost += curWriteCost;
}
else // not sure if tgt superfile can have whichOutput==-1 (but if so, the following cost calc works)
totalWriteCost += calcFileAccessCost(distributedTarget, curProgress.numWrites, 0);
}
}

if (failedParts.length())
error.setown(MakeStringException(DFTERR_InputOutputCrcMismatch, "%s", failedParts.str()));

DistributedFilePropertyLock lock(distributedTarget);
IPropertyTree &curProps = lock.queryAttributes();
cost_type writeCost = calcFileAccessCost(distributedTarget, totalNumWrites, 0);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost);

if (!superTgt)
totalWriteCost = calcFileAccessCost(distributedTarget, totalNumWrites, 0);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), totalWriteCost);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites);

if (calcCRC())
Expand Down Expand Up @@ -3772,17 +3780,53 @@ void FileSprayer::updateTargetProperties()
if (expireDays != -1)
curProps.setPropInt("@expireDays", expireDays);
}
// Update file readCost and numReads in file properties and do the same for subfiles
// Update totalReadCost
cost_type totalReadCost = 0;
if (distributedSource)
{
if (distributedSource->querySuperFile()==nullptr)
IDistributedSuperFile * superSrc = distributedSource->querySuperFile();
if (superSrc)
{
IPropertyTree & fileAttr = distributedSource->queryAttributes();
cost_type legacyReadCost = getLegacyReadCost(fileAttr, distributedSource);
cost_type curReadCost = calcFileAccessCost(distributedSource, 0, totalNumReads);
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost+curReadCost);
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), totalNumReads);
ForEachItemIn(idx, partition)
{
PartitionPoint & cur = partition.item(idx);
OutputProgress & curProgress = progress.item(idx);

if (cur.whichInput != (unsigned)-1)
{
unsigned sourcePartNum = sources.item(cur.whichInput).partNum;
IDistributedFile &subfile = superSrc->querySubFile(sourcePartNum, true);
DistributedFilePropertyLock lock(&subfile);
IPropertyTree &subFileProps = lock.queryAttributes();
stat_type prevNumReads = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
cost_type legacyReadCost = getLegacyReadCost(subfile.queryAttributes(), &subfile);
cost_type prevReadCost = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFreadCost), 0);
cost_type curReadCost = calcFileAccessCost(&subfile, 0, curProgress.numReads);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), prevNumReads + curProgress.numReads);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + prevReadCost + curReadCost);
totalReadCost += curReadCost;
}
else
{
// not sure if src superfile can have whichInput==-1 (but if so, this is best effort to calc cost)
totalReadCost += calcFileAccessCost(distributedSource, 0, curProgress.numReads);
}
}
}
else
{
totalReadCost = calcFileAccessCost(distributedSource, 0, totalNumReads);
}
DistributedFilePropertyLock lock(distributedSource);
IPropertyTree &curProps = lock.queryAttributes();
stat_type prevNumReads = curProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
cost_type legacyReadCost = getLegacyReadCost(curProps, distributedSource);
cost_type prevReadCost = curProps.getPropInt64(getDFUQResultFieldName(DFUQRFreadCost), 0);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), prevNumReads + totalNumReads);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + prevReadCost + totalReadCost);
}
progressReport->setFileAccessCost(cost_type2money(totalReadCost+totalWriteCost));
if (error)
throw error.getClear();
}
Expand Down

0 comments on commit 969f6ef

Please sign in to comment.