From f5cb973e78352433a17104c38db3060a08726bbe Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 19 Dec 2023 11:04:37 +0000 Subject: [PATCH] HPCC-30974 Copying super files via fileservices/dfu does not track read/cost stats. Signed-off-by: Shamser Ahmed --- dali/ft/filecopy.cpp | 65 ++++++++++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/dali/ft/filecopy.cpp b/dali/ft/filecopy.cpp index f8a8b9bb8bb..50d3b58c226 100644 --- a/dali/ft/filecopy.cpp +++ b/dali/ft/filecopy.cpp @@ -3384,25 +3384,9 @@ void FileSprayer::spray() afterTransfer(); //If got here then we have succeeded + //(note: if we don't get here, file access costs will not be updated) updateTargetProperties(); - //Calculate and store file access cost - double fileAccessCost = 0.0; - if (distributedTarget) - { - StringBuffer cluster; - distributedTarget->getClusterName(0, cluster); - if (!cluster.isEmpty()) - fileAccessCost += calcFileAccessCost(cluster, totalNumWrites, 0); - } - if (distributedSource && distributedSource->querySuperFile()==nullptr) - { - StringBuffer cluster; - distributedSource->getClusterName(0, cluster); - if (!cluster.isEmpty()) - fileAccessCost += calcFileAccessCost(cluster, 0, totalNumReads); - } - progressReport->setFileAccessCost(fileAccessCost); StringBuffer copyEventText; // [logical-source] > [logical-target] if (distributedSource) copyEventText.append(distributedSource->queryLogicalName()); @@ -3455,6 +3439,7 @@ void FileSprayer::updateTargetProperties() { TimeSection timer("FileSprayer::updateTargetProperties() time"); Owned error; + cost_type totalCost = 0; if (distributedTarget) { StringBuffer failedParts; @@ -3468,6 +3453,7 @@ void FileSprayer::updateTargetProperties() bool sameSizeSourceTarget = (sources.ordinality() == distributedTarget->numParts()); offset_t partCompressedLength = 0; + IDistributedSuperFile * superTgt = distributedTarget->querySuperFile(); ForEachItemIn(idx, partition) { PartitionPoint & cur = partition.item(idx); @@ -3585,6 +3571,22 @@ void FileSprayer::updateTargetProperties() partLength = 0; partCompressedLength = 0; } + + // Update cost/numWrites for target file or subfile (if superfile) + IDistributedFile * tgtDistFile = nullptr; + if (superTgt) + { + unsigned subfilepartidx = 0; + tgtDistFile = superTgt->querySubPart(cur.whichOutput, subfilepartidx); + } + else + tgtDistFile = distributedTarget; + DistributedFilePropertyLock lock(tgtDistFile); + IPropertyTree &tgtProps = lock.queryAttributes(); + cost_type writeCost = money2cost_type(calcFileAccessCost(tgtDistFile, curProgress.numWrites, 0)); + tgtProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost); + tgtProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites); + totalCost += writeCost; } if (failedParts.length()) @@ -3592,9 +3594,6 @@ void FileSprayer::updateTargetProperties() DistributedFilePropertyLock lock(distributedTarget); IPropertyTree &curProps = lock.queryAttributes(); - cost_type writeCost = money2cost_type(calcFileAccessCost(distributedTarget, totalNumWrites, 0)); - curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost); - curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites); if (calcCRC()) curProps.setPropInt(FAcrc, totalCRC.get()); @@ -3773,15 +3772,29 @@ void FileSprayer::updateTargetProperties() } if (distributedSource) { - if (distributedSource->querySuperFile()==nullptr) + IDistributedSuperFile * superSrc = distributedSource->querySuperFile(); + ForEachItemIn(idx, partition) { - IPropertyTree & fileAttr = distributedSource->queryAttributes(); - cost_type legacyReadCost = getLegacyReadCost(fileAttr, distributedSource); - cost_type curReadCost = money2cost_type(calcFileAccessCost(distributedSource, 0, totalNumReads)); - distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost+curReadCost); - distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), totalNumReads); + PartitionPoint & cur = partition.item(idx); + OutputProgress & curProgress = progress.item(idx); + + // Update cost/numReads for source file or subfile (if superfile) + IDistributedFile * srcDistFile = nullptr; + if (superSrc) + { + unsigned subfilepartidx = 0; + srcDistFile = superSrc->querySubPart(cur.whichInput, subfilepartidx); + } + else + srcDistFile = distributedSource; + cost_type legacyReadCost = getLegacyReadCost(srcDistFile->queryAttributes(), srcDistFile); + cost_type readCost = money2cost_type(calcFileAccessCost(srcDistFile, curProgress.numReads, 0)); + srcDistFile->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost+readCost); + srcDistFile->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curProgress.numReads); + totalCost += readCost; } } + progressReport->setFileAccessCost(cost_type2money(totalCost)); if (error) throw error.getClear(); }