From 0caf1f2353f3b07db9a4eab07f7560b6926d66ec Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 19 Dec 2023 11:04:37 +0000 Subject: [PATCH] HPCC-30974 Copying super files via fileservices/dfu does not track read/cost stats. Signed-off-by: Shamser Ahmed --- dali/ft/filecopy.cpp | 93 ++++++++++++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 25 deletions(-) diff --git a/dali/ft/filecopy.cpp b/dali/ft/filecopy.cpp index f8a8b9bb8bb..2b7b4cc87a5 100644 --- a/dali/ft/filecopy.cpp +++ b/dali/ft/filecopy.cpp @@ -3384,25 +3384,9 @@ void FileSprayer::spray() afterTransfer(); //If got here then we have succeeded + //Note: On failure, costs will not be updated. Future: would be useful to have a way to update costs on failure. updateTargetProperties(); - //Calculate and store file access cost - double fileAccessCost = 0.0; - if (distributedTarget) - { - StringBuffer cluster; - distributedTarget->getClusterName(0, cluster); - if (!cluster.isEmpty()) - fileAccessCost += calcFileAccessCost(cluster, totalNumWrites, 0); - } - if (distributedSource && distributedSource->querySuperFile()==nullptr) - { - StringBuffer cluster; - distributedSource->getClusterName(0, cluster); - if (!cluster.isEmpty()) - fileAccessCost += calcFileAccessCost(cluster, 0, totalNumReads); - } - progressReport->setFileAccessCost(fileAccessCost); StringBuffer copyEventText; // [logical-source] > [logical-target] if (distributedSource) copyEventText.append(distributedSource->queryLogicalName()); @@ -3455,6 +3439,7 @@ void FileSprayer::updateTargetProperties() { TimeSection timer("FileSprayer::updateTargetProperties() time"); Owned error; + cost_type totalWriteCost = 0; if (distributedTarget) { StringBuffer failedParts; @@ -3467,6 +3452,7 @@ void FileSprayer::updateTargetProperties() bool sameSizeHeaderFooter = isSameSizeHeaderFooter(); bool sameSizeSourceTarget = (sources.ordinality() == distributedTarget->numParts()); offset_t partCompressedLength = 0; + IDistributedSuperFile * superTgt = distributedTarget->querySuperFile(); ForEachItemIn(idx, partition) { @@ -3585,6 +3571,26 @@ void FileSprayer::updateTargetProperties() partLength = 0; partCompressedLength = 0; } + + // Update @writeCost and @numWrites in subfile properties and update totalWriteCost + if (superTgt) + { + if (cur.whichOutput != (unsigned)-1) + { + unsigned targetPartNum = targets.item(cur.whichOutput).partNum; + IDistributedFile &subfile = superTgt->querySubFile(targetPartNum, true); + DistributedFilePropertyLock lock(&subfile); + IPropertyTree &subFileProps = lock.queryAttributes(); + cost_type prevNumWrites = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites)); + cost_type prevWriteCost = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost)); + cost_type curWriteCost = money2cost_type(calcFileAccessCost(&subfile, curProgress.numWrites, 0)); + subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), prevWriteCost + curWriteCost); + subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), prevNumWrites + curProgress.numWrites); + totalWriteCost += curWriteCost; + } + else // not sure if tgt superfile can have whichOutput==-1 (but if so, the following cost calc works) + totalWriteCost += money2cost_type(calcFileAccessCost(distributedTarget, curProgress.numWrites, 0)); + } } if (failedParts.length()) @@ -3592,8 +3598,10 @@ void FileSprayer::updateTargetProperties() DistributedFilePropertyLock lock(distributedTarget); IPropertyTree &curProps = lock.queryAttributes(); - cost_type writeCost = money2cost_type(calcFileAccessCost(distributedTarget, totalNumWrites, 0)); - curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost); + + if (!superTgt) + totalWriteCost = money2cost_type(calcFileAccessCost(distributedTarget, totalNumWrites, 0)); + curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), totalWriteCost); curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites); if (calcCRC()) @@ -3771,17 +3779,52 @@ void FileSprayer::updateTargetProperties() if (expireDays != -1) curProps.setPropInt("@expireDays", expireDays); } + // Update file readCost and numReads in file properties and do the same for subfiles + // Update totalReadCost + cost_type totalReadCost = 0; if (distributedSource) { - if (distributedSource->querySuperFile()==nullptr) + IDistributedSuperFile * superSrc = distributedSource->querySuperFile(); + if (superSrc) { - IPropertyTree & fileAttr = distributedSource->queryAttributes(); - cost_type legacyReadCost = getLegacyReadCost(fileAttr, distributedSource); - cost_type curReadCost = money2cost_type(calcFileAccessCost(distributedSource, 0, totalNumReads)); - distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost+curReadCost); - distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), totalNumReads); + ForEachItemIn(idx, partition) + { + PartitionPoint & cur = partition.item(idx); + OutputProgress & curProgress = progress.item(idx); + + if (cur.whichInput != (unsigned)-1) + { + unsigned sourcePartNum = sources.item(cur.whichInput).partNum; + IDistributedFile &subfile = superSrc->querySubFile(sourcePartNum, true); + DistributedFilePropertyLock lock(&subfile); + IPropertyTree &subFileProps = lock.queryAttributes(); + stat_type prevNumReads = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0); + cost_type legacyReadCost = getLegacyReadCost(subfile.queryAttributes(), &subfile); + cost_type prevReadCost = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFreadCost), 0); + cost_type curReadCost = money2cost_type(calcFileAccessCost(&subfile, 0, curProgress.numReads)); + subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), prevNumReads + curProgress.numReads); + subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + prevReadCost + curReadCost); + totalReadCost += curReadCost; + } + else + { + totalReadCost += money2cost_type(calcFileAccessCost(distributedSource, 0, curProgress.numReads)); + } + } } + else + { + totalReadCost = money2cost_type(calcFileAccessCost(distributedSource, 0, totalNumReads)); + } + DistributedFilePropertyLock lock(distributedSource); + IPropertyTree &curProps = lock.queryAttributes(); + stat_type prevNumReads = curProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0); + cost_type legacyReadCost = getLegacyReadCost(curProps, distributedSource); + cost_type prevReadCost = curProps.getPropInt64(getDFUQResultFieldName(DFUQRFreadCost), 0); + curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), prevNumReads + totalNumReads); + curProps.setPropInt64(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + prevReadCost + totalReadCost); } + progressReport->setFileAccessCost(cost_type2money(totalReadCost+totalWriteCost)); if (error) throw error.getClear(); }