Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.8.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	version.cmake
  • Loading branch information
jakesmith committed Nov 11, 2024
2 parents 67facac + 220ee22 commit 76bf516
Show file tree
Hide file tree
Showing 35 changed files with 1,002 additions and 462 deletions.
129 changes: 98 additions & 31 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ void FileSprayer::afterTransfer()
}
}

bool FileSprayer::allowSplit()
bool FileSprayer::allowSplit() const
{
return !(options->getPropBool(ANnosplit) || options->getPropBool(ANnosplit2) || options->queryProp(ANprefix));
}
Expand Down Expand Up @@ -1592,12 +1592,31 @@ void FileSprayer::commonUpSlaves()
cur.whichSlave = 0;
}

if (options->getPropBool(ANnocommon, true) || pushWhole)
if (pushWhole)
return;

// noCommon is defaulted to on for non-containerized (revisit!)
bool noCommon = options->getPropBool(ANnocommon, !isContainerized());
if (noCommon)
{
if (!isContainerized())
return;
IWARNLOG("Ignoring noCommon option in containerized mode");
}

//First work out which are the same slaves, and then map the partition.
//Previously it was n^2 in partition, which is fine until you spray 100K files.
unsigned numSlaves = pull ? targets.ordinality() : sources.ordinality();
bool commonByIp = !isContainerized();

offset_t totalSourceFileSize = 0;
offset_t threshold = 0x8000 * numSlaves;
ForEachItemIn(i, sources)
{
const FilePartInfo & cur = sources.item(i);
totalSourceFileSize += copyCompressed ? cur.psize : cur.size;
}

unsigned * slaveMapping = new unsigned [numSlaves];
for (unsigned i = 0; i < numSlaves; i++)
slaveMapping[i] = i;
Expand All @@ -1609,22 +1628,32 @@ void FileSprayer::commonUpSlaves()
TargetLocation & cur = targets.item(i1);
for (unsigned i2 = 0; i2 < i1; i2++)
{
if (targets.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
bool match = false;
if (commonByIp)
match = targets.item(i2).filename.queryIP().ipequals(cur.filename.queryIP());
else if (!targetSupportsConcurrentWrite || totalSourceFileSize < threshold)
match = targets.item(i2).filename.equals(cur.filename);
if (match)
{
slaveMapping[i1] = i2;
break;
}
}
}
}
else
else // push
{
for (unsigned i1 = 1; i1 < numSlaves; i1++)
{
FilePartInfo & cur = sources.item(i1);
for (unsigned i2 = 0; i2 < i1; i2++)
{
if (sources.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
bool match = false;
if (commonByIp) // match by IP
match = sources.item(i2).filename.queryIP().ipequals(cur.filename.queryIP());
else if (totalSourceFileSize < threshold)
match = sources.item(i2).filename.equals(cur.filename);
if (match)
{
slaveMapping[i1] = i2;
break;
Expand All @@ -1633,7 +1662,6 @@ void FileSprayer::commonUpSlaves()
}
}


for (unsigned i3 = 0; i3 < max; i3++)
{
PartitionPoint & cur = partition.item(i3);
Expand Down Expand Up @@ -2493,7 +2521,6 @@ void FileSprayer::insertHeaders()
}
}


bool FileSprayer::needToCalcOutput()
{
return !usePullOperation() || options->getPropBool(ANverify);
Expand Down Expand Up @@ -2607,6 +2634,7 @@ void FileSprayer::pullParts()
transferSlaves.append(next);
}

// NB: not all transferServers will be used, depending on mapping of whichSlave
ForEachItemIn(idx3, partition)
{
PartitionPoint & cur = partition.item(idx3);
Expand Down Expand Up @@ -2662,6 +2690,7 @@ void FileSprayer::pushParts()
transferSlaves.append(next);
}

// NB: not all transferServers will be used, depending on mapping of whichSlave
ForEachItemIn(idx3, partition)
{
PartitionPoint & cur = partition.item(idx3);
Expand Down Expand Up @@ -3048,6 +3077,7 @@ void FileSprayer::setTarget(IDistributedFile * target)
TargetLocation & next = * new TargetLocation(curPart->getFilename(rfn,copy), idx);
targets.append(next);
}
target->getClusterGroupName(0, targetPlane.clear());

checkSprayOptions();
}
Expand All @@ -3069,6 +3099,7 @@ void FileSprayer::setTarget(IFileDescriptor * target, unsigned copy)
target->getFilename(idx, copy, filename);
targets.append(*new TargetLocation(filename, idx));
}
target->getClusterGroupName(0, targetPlane.clear());

checkSprayOptions();
}
Expand Down Expand Up @@ -3281,6 +3312,8 @@ void FileSprayer::spray()
return;
}

targetSupportsConcurrentWrite = getConcurrentWriteSupported(targetPlane);

checkFormats();
checkForOverlap();

Expand Down Expand Up @@ -3396,7 +3429,9 @@ void FileSprayer::spray()

//If got here then we have succeeded
//Note: On failure, costs will not be updated. Future: would be useful to have a way to update costs on failure.
updateTargetProperties();
cost_type totalWriteCost = updateTargetProperties();
cost_type totalReadCost = updateSourceProperties();
progressReport->setFileAccessCost(totalReadCost+totalWriteCost);

StringBuffer copyEventText; // [logical-source] > [logical-target]
if (distributedSource)
Expand Down Expand Up @@ -3446,13 +3481,13 @@ bool FileSprayer::isSameSizeHeaderFooter()
return retVal;
}

void FileSprayer::updateTargetProperties()
cost_type FileSprayer::updateTargetProperties()
{
TimeSection timer("FileSprayer::updateTargetProperties() time");
Owned<IException> error;
cost_type totalWriteCost = 0;
if (distributedTarget)
{
cost_type totalWriteCost = 0;
StringBuffer failedParts;
CRC32Merger partCRC;
offset_t partLength = 0;
Expand Down Expand Up @@ -3803,12 +3838,20 @@ void FileSprayer::updateTargetProperties()
int expireDays = options->getPropInt("@expireDays", -1);
if (expireDays != -1)
curProps.setPropInt("@expireDays", expireDays);
return totalWriteCost;
}
if (error)
throw error.getClear();
return 0;
}

cost_type FileSprayer::updateSourceProperties()
{
TimeSection timer("FileSprayer::updateSourceProperties() time");
// Update file readCost and numReads in file properties and do the same for subfiles
// Update totalReadCost
cost_type totalReadCost = 0;
if (distributedSource)
{
cost_type totalReadCost = 0;
IDistributedSuperFile * superSrc = distributedSource->querySuperFile();
if (superSrc && superSrc->numSubFiles() > 0)
{
Expand All @@ -3833,14 +3876,10 @@ void FileSprayer::updateTargetProperties()
// so query the first (and only) subfile
subfile = &superSrc->querySubFile(0);
}
DistributedFilePropertyLock lock(subfile);
IPropertyTree &subFileProps = lock.queryAttributes();
stat_type prevNumReads = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
cost_type legacyReadCost = getLegacyReadCost(subfile->queryAttributes(), subfile);
cost_type prevReadCost = subFileProps.getPropInt64(getDFUQResultFieldName(DFUQRFreadCost), 0);
cost_type curReadCost = calcFileAccessCost(subfile, 0, curProgress.numReads);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), prevNumReads + curProgress.numReads);
subFileProps.setPropInt64(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + prevReadCost + curReadCost);
subfile->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curProgress.numReads);
cost_type legacyReadCost = getLegacyReadCost(subfile->queryAttributes(), subfile);
subfile->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost);
totalReadCost += curReadCost;
}
else
Expand All @@ -3854,20 +3893,14 @@ void FileSprayer::updateTargetProperties()
{
totalReadCost = calcFileAccessCost(distributedSource, 0, totalNumReads);
}
DistributedFilePropertyLock lock(distributedSource);
IPropertyTree &curProps = lock.queryAttributes();
stat_type prevNumReads = curProps.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
cost_type legacyReadCost = getLegacyReadCost(curProps, distributedSource);
cost_type prevReadCost = curProps.getPropInt64(getDFUQResultFieldName(DFUQRFreadCost), 0);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), prevNumReads + totalNumReads);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + prevReadCost + totalReadCost);
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), totalNumReads);
cost_type legacyReadCost = getLegacyReadCost(distributedSource->queryAttributes(), distributedSource);
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + totalReadCost);
return totalReadCost; // return the total cost of this file operation (exclude previous and legacy read costs)
}
progressReport->setFileAccessCost(totalReadCost+totalWriteCost);
if (error)
throw error.getClear();
return 0;
}


void FileSprayer::splitAndCollectFileInfo(IPropertyTree * newRecord, RemoteFilename &remoteFileName,
bool isDistributedSource)
{
Expand Down Expand Up @@ -3984,11 +4017,45 @@ bool FileSprayer::calcUsePull() const
LOG(MCdebugInfo, "Use pull since explicitly specified");
return true;
}
if (options->getPropBool(ANpush, false))

bool pushRequested = options->getPropBool(ANpush);
if (!targetSupportsConcurrentWrite) // NB: default for containerized is false
{
if (!pushRequested)
return true;
if (!usePushWholeOperation())
{
if (targets.ordinality() <= sources.ordinality())
{
// NB: this is being calculated before partitioning has occurred
// It can be refactored so that it decides after partitioning, and only has to force pull
// if multiple partitions write to same target file.
LOG(MCdebugInfo, "Use pull operation because target doesn't support concurrent write");
return true;
}
// else targets > sources

// if push requested and N:M and no split, then throw an error unless expert option allows
if (!copySource) // 1:1 partitioning if copySource==true
{
if ((sources.ordinality() > 1) && (targets.ordinality() > 1) && !allowSplit())
{
if (!getComponentConfigSP()->getPropBool("expert/@allowPushNoSplit"))
throw makeStringExceptionV(0, "Pushing to multiple targets with no split is not supported to this target plane (%s)", targetPlane.str());
}
}
}
LOG(MCdebugInfo, "Use push since explicitly specified");
return false;
}
else // ! targetSupportsConcurrentWrite
{
if (pushRequested)
{
LOG(MCdebugInfo, "Use push since explicitly specified");
return false;
}
}

ForEachItemIn(idx2, sources)
{
Expand Down
7 changes: 5 additions & 2 deletions dali/ft/filecopy.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected:
void addTarget(unsigned idx, INode * node);
void afterGatherFileSizes();
void afterTransfer();
bool allowSplit();
bool allowSplit() const;
void analyseFileHeaders(bool setcurheadersize);
void assignPartitionFilenames();
void beforeTransfer();
Expand Down Expand Up @@ -271,7 +271,8 @@ protected:
void savePartition();
void setCopyCompressedRaw();
void setSource(IFileDescriptor * source, unsigned copy, unsigned mirrorCopy = (unsigned)-1);
void updateTargetProperties();
cost_type updateTargetProperties();
cost_type updateSourceProperties();
bool usePullOperation() const;
bool usePushOperation() const;
bool usePushWholeOperation() const;
Expand Down Expand Up @@ -307,6 +308,8 @@ protected:
Linked<IDistributedFile> distributedTarget;
Linked<IDistributedFile> distributedSource;
TargetLocationArray targets;
StringBuffer targetPlane;
bool targetSupportsConcurrentWrite = true; // if false, will prevent multiple writers to same target file (e.g. not supported by Azure Blob storage)
FileFormat srcFormat;
FileFormat tgtFormat;
Owned<IDFPartFilter> filter;
Expand Down
17 changes: 15 additions & 2 deletions ecl/hqlcpp/hqlckey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,22 @@ IHqlExpression * KeyedJoinInfo::querySimplifiedKey(IHqlExpression * expr)
}
}

IHqlExpression * queryBaseIndexForKeyedJoin(IHqlExpression * expr)
{
if (expr->getOperator() == no_if)
{
IHqlExpression * left = queryBaseIndexForKeyedJoin(expr->queryChild(1));
IHqlExpression * right = queryBaseIndexForKeyedJoin(expr->queryChild(2));
if (left && right)
return left;
return nullptr;
}
return queryPhysicalRootTable(expr);
}

IHqlExpression * KeyedJoinInfo::createKeyFromComplexKey(IHqlExpression * expr)
{
IHqlExpression * base = queryPhysicalRootTable(expr);
IHqlExpression * base = queryBaseIndexForKeyedJoin(expr);
if (!base)
{
translator.throwError1(HQLERR_KeyedJoinNoRightIndex_X, getOpString(expr->getOperator()));
Expand Down Expand Up @@ -1234,7 +1247,7 @@ void HqlCppTranslator::buildKeyJoinIndexReadHelper(ActivityInstance & instance,
buildFilenameFunction(instance, instance.startctx, WaIndexname, "getIndexFileName", info->queryKeyFilename(), hasDynamicFilename(info->queryKey()), SummaryType::ReadIndex, info->isKeyOpt(), info->isKeySigned());

//virtual IOutputMetaData * queryIndexRecordSize() = 0;
LinkedHqlExpr indexExpr = info->queryOriginalKey();
LinkedHqlExpr indexExpr = info->queryKey();
OwnedHqlExpr serializedRecord;
unsigned numPayload = numPayloadFields(indexExpr);
if (numPayload)
Expand Down
19 changes: 15 additions & 4 deletions esp/src/eclwatch/ClusterProcessesQueryWidget.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ define([
"dojo/_base/declare",
"src/nlsHPCC",
"dojo/topic",
"dojo/dom-construct",

"dijit/registry",

Expand All @@ -20,7 +21,7 @@ define([
"hpcc/IFrameWidget",

"dijit/Dialog",
], function (declare, nlsHPCCMod, topic,
], function (declare, nlsHPCCMod, topic, domConstruct,
registry,
tree, selector,
GridDetailsWidget, ESPPreflight, ESPRequest, WsTopology, Utility, ESPUtil, DelayLoadWidget, PreflightDetailsWidget, MachineInformationWidget, IFrameWidget) {
Expand Down Expand Up @@ -50,6 +51,10 @@ define([
style: "border: 0; width: 100%; height: 100%"
});
this.legacyClustersProcessesIframeWidget.placeAt(this._tabContainer, "last");
if (dojoConfig.isContainer) {
const legacyTab = registry.byId(this.id + "_LegacyClustersProcessesIframeWidget");
legacyTab.set("disabled", true);
}
},

init: function (params) {
Expand Down Expand Up @@ -85,9 +90,15 @@ define([
if (currSel.id === this.id + "_Grid") {
this.refreshGrid();
} else if (currSel.id === this.legacyClustersProcessesIframeWidget.id && !this.legacyClustersProcessesIframeWidget.initalized) {
this.legacyClustersProcessesIframeWidget.init({
src: ESPRequest.getBaseURL("WsTopology") + "/TpClusterQuery?Type=ROOT"
});
if (!dojoConfig.isContainer) {
this.legacyClustersProcessesIframeWidget.init({
src: ESPRequest.getBaseURL("WsTopology") + "/TpClusterQuery?Type=ROOT"
});
} else {
const unavailMsg = domConstruct.create("div", { style: { margin: "5px" } });
unavailMsg.innerText = this.i18n.UnavailableInContainerized;
this.legacyClustersProcessesIframeWidget.contentPane.set("content", unavailMsg);
}
} else {
currSel.init(currSel.params);
}
Expand Down
Loading

0 comments on commit 76bf516

Please sign in to comment.