Skip to content

Commit

Permalink
HPCC-30974 Copying super files via fileservices/dfu does not track re…
Browse files Browse the repository at this point in the history
…ad/cost stats.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Dec 20, 2023
1 parent c3e6871 commit 19c686e
Showing 1 changed file with 39 additions and 26 deletions.
65 changes: 39 additions & 26 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3384,25 +3384,9 @@ void FileSprayer::spray()
afterTransfer();

//If got here then we have succeeded
//(note: if we don't get here, file access costs will not be updated)
updateTargetProperties();

//Calculate and store file access cost
double fileAccessCost = 0.0;
if (distributedTarget)
{
StringBuffer cluster;
distributedTarget->getClusterName(0, cluster);
if (!cluster.isEmpty())
fileAccessCost += calcFileAccessCost(cluster, totalNumWrites, 0);
}
if (distributedSource && distributedSource->querySuperFile()==nullptr)
{
StringBuffer cluster;
distributedSource->getClusterName(0, cluster);
if (!cluster.isEmpty())
fileAccessCost += calcFileAccessCost(cluster, 0, totalNumReads);
}
progressReport->setFileAccessCost(fileAccessCost);
StringBuffer copyEventText; // [logical-source] > [logical-target]
if (distributedSource)
copyEventText.append(distributedSource->queryLogicalName());
Expand Down Expand Up @@ -3455,6 +3439,7 @@ void FileSprayer::updateTargetProperties()
{
TimeSection timer("FileSprayer::updateTargetProperties() time");
Owned<IException> error;
cost_type totalCost = 0;
if (distributedTarget)
{
StringBuffer failedParts;
Expand All @@ -3468,6 +3453,7 @@ void FileSprayer::updateTargetProperties()
bool sameSizeSourceTarget = (sources.ordinality() == distributedTarget->numParts());
offset_t partCompressedLength = 0;

IDistributedSuperFile * superTgt = distributedTarget->querySuperFile();
ForEachItemIn(idx, partition)
{
PartitionPoint & cur = partition.item(idx);
Expand Down Expand Up @@ -3585,16 +3571,29 @@ void FileSprayer::updateTargetProperties()
partLength = 0;
partCompressedLength = 0;
}

// Update cost/numWrites for target file or subfile (if superfile)
IDistributedFile * tgtDistFile = nullptr;
if (superTgt)
{
unsigned subfilepartidx = 0;
tgtDistFile = superTgt->querySubPart(cur.whichOutput, subfilepartidx);
}
else
tgtDistFile = distributedTarget;
DistributedFilePropertyLock lock(tgtDistFile);
IPropertyTree &tgtProps = lock.queryAttributes();
cost_type writeCost = money2cost_type(calcFileAccessCost(tgtDistFile, curProgress.numWrites, 0));
tgtProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost);
tgtProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites);
totalCost += writeCost;
}

if (failedParts.length())
error.setown(MakeStringException(DFTERR_InputOutputCrcMismatch, "%s", failedParts.str()));

DistributedFilePropertyLock lock(distributedTarget);
IPropertyTree &curProps = lock.queryAttributes();
cost_type writeCost = money2cost_type(calcFileAccessCost(distributedTarget, totalNumWrites, 0));
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites);

if (calcCRC())
curProps.setPropInt(FAcrc, totalCRC.get());
Expand Down Expand Up @@ -3773,15 +3772,29 @@ void FileSprayer::updateTargetProperties()
}
if (distributedSource)
{
if (distributedSource->querySuperFile()==nullptr)
IDistributedSuperFile * superSrc = distributedSource->querySuperFile();
ForEachItemIn(idx, partition)
{
IPropertyTree & fileAttr = distributedSource->queryAttributes();
cost_type legacyReadCost = getLegacyReadCost(fileAttr, distributedSource);
cost_type curReadCost = money2cost_type(calcFileAccessCost(distributedSource, 0, totalNumReads));
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost+curReadCost);
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), totalNumReads);
PartitionPoint & cur = partition.item(idx);
OutputProgress & curProgress = progress.item(idx);

// Update cost/numReads for source file or subfile (if superfile)
IDistributedFile * srcDistFile = nullptr;
if (superSrc)
{
unsigned subfilepartidx = 0;
srcDistFile = superSrc->querySubPart(cur.whichInput, subfilepartidx);
}
else
srcDistFile = distributedSource;
cost_type legacyReadCost = getLegacyReadCost(srcDistFile->queryAttributes(), srcDistFile);
cost_type readCost = money2cost_type(calcFileAccessCost(srcDistFile, curProgress.numReads, 0));
srcDistFile->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost+readCost);
srcDistFile->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curProgress.numReads);
totalCost += readCost;
}
}
progressReport->setFileAccessCost(cost_type2money(totalCost));
if (error)
throw error.getClear();
}
Expand Down

0 comments on commit 19c686e

Please sign in to comment.