Skip to content

Commit

Permalink
Merge pull request #18794 from shamser/issue31647
Browse files Browse the repository at this point in the history
HPCC-31647 spill stats for join

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jun 25, 2024
2 parents c5b3595 + e4082eb commit 29ad0a3
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 8 deletions.
6 changes: 4 additions & 2 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr

// NB: Only used by channel 0
Owned<CFileOwner> overflowWriteFile;
Owned<IRowWriter> overflowWriteStream;
Owned<IExtRowWriter> overflowWriteStream;
rowcount_t overflowWriteCount;
OwnedMalloc<IChannelDistributor *> channelDistributors;
unsigned nextRhsToSpill = 0;
Expand Down Expand Up @@ -1881,7 +1881,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
file.setown(container.queryActivity()->createOwnedTempFile(tempName.str()));
VStringBuffer spillPrefixStr("clearAllNonLocalRows(%d)", SPILL_PRIORITY_SPILLABLE_STREAM);
// 3rd param. is skipNulls = true, the row arrays may have had the non-local rows delete already.
rows.save(file->queryIFile(), spillCompInfo, true, spillPrefixStr.str()); // saves committed rows
rows.save(*file, spillCompInfo, true, spillPrefixStr.str()); // saves committed rows
rows.flushMarker = 0; // reset because array will be moved as a consequence of further adds, so next scan must be from start
}

Expand Down Expand Up @@ -2900,6 +2900,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
overflowWriteCount += rhsInRowsTemp.ordinality();
ForEachItemIn(r, rhsInRowsTemp)
overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite));
return true;
}
if (hasFailedOverToLocal())
Expand Down Expand Up @@ -2949,6 +2950,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
overflowWriteCount += rhsInRowsTemp.ordinality();
ForEachItemIn(r, rhsInRowsTemp)
overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite));
return true;
}
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
Expand Down
9 changes: 5 additions & 4 deletions thorlcr/thorutil/thmem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class CSpillableStreamBase : public CSpillable
GetTempFilePath(tempName, tempPrefix.str());
spillFile.setown(activity.createOwnedTempFile(tempName.str()));
VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority);
rows.save(spillFile->queryIFile(), spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
spillFile->noteSize(spillFile->queryIFile().size());
return true;
Expand Down Expand Up @@ -1375,7 +1375,7 @@ static int callbackSortRev(IInterface * const *cb2, IInterface * const *cb1)
return 1;
}

rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, bool skipNulls, const char *_tracingPrefix)
rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCompInfo, bool skipNulls, const char *_tracingPrefix)
{
rowidx_t n = numCommitted();
if (0 == n)
Expand Down Expand Up @@ -1405,7 +1405,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
nextCB = &cbCopy.popGet();
nextCBI = nextCB->queryRecordNumber();
}
Owned<IExtRowWriter> writer = createRowWriter(&iFile, rowIf, rwFlags, nullptr, compBlkSz);
Owned<IExtRowWriter> writer = createRowWriter(&iFileOwner.queryIFile(), rowIf, rwFlags, nullptr, compBlkSz);
rowidx_t i=0;
rowidx_t rowsWritten=0;
try
Expand Down Expand Up @@ -1444,6 +1444,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
++i;
}
writer->flush(NULL);
iFileOwner.noteSize(writer->getStatistic(StSizeDiskWrite));
}
catch (IException *e)
{
Expand Down Expand Up @@ -1656,7 +1657,7 @@ class CThorRowCollectorBase : public CSpillable
GetTempFilePath(tempName, tempPrefix.str());
VStringBuffer spillPrefixStr("%sRowCollector(%d)", tracingPrefix.str(), spillPriority);
Owned<CFileOwner> tempFileOwner = activity.createOwnedTempFile(tempName.str());
spillableRows.save(tempFileOwner->queryIFile(), spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
spillableRows.save(*tempFileOwner, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
spillFiles.append(tempFileOwner.getLink());
++overflowCount;
statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thmem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem

//A thread calling the following functions must own the lock, or guarantee no other thread will access
void sort(ICompare & compare, unsigned maxcores);
rowidx_t save(IFile &file, unsigned _spillCompInfo, bool skipNulls, const char *tracingPrefix);
rowidx_t save(CFileOwner &file, unsigned _spillCompInfo, bool skipNulls, const char *tracingPrefix);

inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe!
inline rowidx_t queryTotalRows() const { return CThorExpandingRowArray::ordinality(); } // includes uncommited rows
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexR
const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, indexReadFileStatistics);
const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics);
const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics);
const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, spillStatistics, basicActivityStatistics);
const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics);
const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics);
const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
Expand Down

0 comments on commit 29ad0a3

Please sign in to comment.