Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30974 Copying super files via fileservices/dfu does not track read/cost stats #18157

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 69 additions & 25 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3384,25 +3384,9 @@ void FileSprayer::spray()
afterTransfer();

//If got here then we have succeeded
//Note: On failure, costs will not be updated. Future: would be useful to have a way to update costs on failure.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateTargetProperties();

//Calculate and store file access cost
cost_type fileAccessCost = 0;
if (distributedTarget)
{
StringBuffer cluster;
distributedTarget->getClusterName(0, cluster);
if (!cluster.isEmpty())
fileAccessCost += calcFileAccessCost(cluster, totalNumWrites, 0);
}
if (distributedSource && distributedSource->querySuperFile()==nullptr)
{
StringBuffer cluster;
distributedSource->getClusterName(0, cluster);
if (!cluster.isEmpty())
fileAccessCost += calcFileAccessCost(cluster, 0, totalNumReads);
}
progressReport->setFileAccessCost(fileAccessCost);
StringBuffer copyEventText; // [logical-source] > [logical-target]
if (distributedSource)
copyEventText.append(distributedSource->queryLogicalName());
Expand Down Expand Up @@ -3455,6 +3439,7 @@ void FileSprayer::updateTargetProperties()
{
TimeSection timer("FileSprayer::updateTargetProperties() time");
Owned<IException> error;
cost_type totalWriteCost = 0;
if (distributedTarget)
{
StringBuffer failedParts;
Expand All @@ -3467,6 +3452,7 @@ void FileSprayer::updateTargetProperties()
bool sameSizeHeaderFooter = isSameSizeHeaderFooter();
bool sameSizeSourceTarget = (sources.ordinality() == distributedTarget->numParts());
offset_t partCompressedLength = 0;
IDistributedSuperFile * superTgt = distributedTarget->querySuperFile();

ForEachItemIn(idx, partition)
{
Expand Down Expand Up @@ -3585,15 +3571,37 @@ void FileSprayer::updateTargetProperties()
partLength = 0;
partCompressedLength = 0;
}

// Update @writeCost and @numWrites in subfile properties and update totalWriteCost
if (superTgt)
{
if (cur.whichOutput != (unsigned)-1)
{
unsigned targetPartNum = targets.item(cur.whichOutput).partNum;
IDistributedFile &subfile = superTgt->querySubFile(targetPartNum, true);
DistributedFilePropertyLock lock(&subfile);
IPropertyTree &subFileProps = lock.queryAttributes();
cost_type prevNumWrites = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites));
cost_type prevWriteCost = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost));
cost_type curWriteCost = calcFileAccessCost(&subfile, curProgress.numWrites, 0);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), prevWriteCost + curWriteCost);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), prevNumWrites + curProgress.numWrites);
totalWriteCost += curWriteCost;
}
else // not sure if tgt superfile can have whichOutput==-1 (but if so, the following cost calc works)
totalWriteCost += calcFileAccessCost(distributedTarget, curProgress.numWrites, 0);
}
}

if (failedParts.length())
error.setown(MakeStringException(DFTERR_InputOutputCrcMismatch, "%s", failedParts.str()));

DistributedFilePropertyLock lock(distributedTarget);
IPropertyTree &curProps = lock.queryAttributes();
cost_type writeCost = calcFileAccessCost(distributedTarget, totalNumWrites, 0);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost);

if (!superTgt)
totalWriteCost = calcFileAccessCost(distributedTarget, totalNumWrites, 0);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), totalWriteCost);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites);

if (calcCRC())
Expand Down Expand Up @@ -3772,17 +3780,53 @@ void FileSprayer::updateTargetProperties()
if (expireDays != -1)
curProps.setPropInt("@expireDays", expireDays);
}
// Update file readCost and numReads in file properties and do the same for subfiles
// Update totalReadCost
cost_type totalReadCost = 0;
if (distributedSource)
{
if (distributedSource->querySuperFile()==nullptr)
IDistributedSuperFile * superSrc = distributedSource->querySuperFile();
if (superSrc)
{
IPropertyTree & fileAttr = distributedSource->queryAttributes();
cost_type legacyReadCost = getLegacyReadCost(fileAttr, distributedSource);
cost_type curReadCost = calcFileAccessCost(distributedSource, 0, totalNumReads);
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost+curReadCost);
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), totalNumReads);
ForEachItemIn(idx, partition)
{
PartitionPoint & cur = partition.item(idx);
OutputProgress & curProgress = progress.item(idx);

if (cur.whichInput != (unsigned)-1)
{
unsigned sourcePartNum = sources.item(cur.whichInput).partNum;
IDistributedFile &subfile = superSrc->querySubFile(sourcePartNum, true);
DistributedFilePropertyLock lock(&subfile);
IPropertyTree &subFileProps = lock.queryAttributes();
stat_type prevNumReads = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
cost_type legacyReadCost = getLegacyReadCost(subfile.queryAttributes(), &subfile);
cost_type prevReadCost = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFreadCost), 0);
cost_type curReadCost = calcFileAccessCost(&subfile, 0, curProgress.numReads);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), prevNumReads + curProgress.numReads);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + prevReadCost + curReadCost);
totalReadCost += curReadCost;
}
else
{
// not sure if src superfile can have whichInput==-1 (but if so, this is best effort to calc cost)
totalReadCost += calcFileAccessCost(distributedSource, 0, curProgress.numReads);
}
}
}
else
{
totalReadCost = calcFileAccessCost(distributedSource, 0, totalNumReads);
}
DistributedFilePropertyLock lock(distributedSource);
IPropertyTree &curProps = lock.queryAttributes();
stat_type prevNumReads = curProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
cost_type legacyReadCost = getLegacyReadCost(curProps, distributedSource);
cost_type prevReadCost = curProps.getPropInt64(getDFUQResultFieldName(DFUQRFreadCost), 0);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), prevNumReads + totalNumReads);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + prevReadCost + totalReadCost);
}
progressReport->setFileAccessCost(cost_type2money(totalReadCost+totalWriteCost));
if (error)
throw error.getClear();
}
Expand Down
Loading