Skip to content

Commit

Permalink
HPCC-31238 Record spill stats for hash dedup
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Mar 18, 2024
1 parent 43962f5 commit b04690e
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 11 deletions.
19 changes: 16 additions & 3 deletions common/thorhelper/thorcommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,10 @@ class CRowStreamWriter : private IRowSerializerTarget, implements IExtRowWriter,
--nested;
}

virtual unsigned __int64 getStatistic(StatisticKind kind) override
{
return stream->getStatistic(kind);
}
};

#ifdef TRACE_CREATE
Expand Down Expand Up @@ -1648,18 +1652,27 @@ IExtRowWriter *createRowWriter(IFile *iFile, IRowInterfaces *rowIf, unsigned fla
return createRowWriter(iFileIO, rowIf, flags);
}

IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags, size32_t compressorBlkSz)
IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags, ICompressor *compressor, size32_t compressorBlkSz)
{
Owned<ICompressedFileIO> compressedFileIO;
if (TestRwFlag(flags, rw_compress))
throw MakeStringException(0, "Unsupported createRowWriter flags");
{
size32_t fixedSize = rowIf->queryRowMetaData()->querySerializedDiskMeta()->getFixedSize();
if (fixedSize && TestRwFlag(flags, rw_grouped))
++fixedSize; // row writer will include a grouping byte
compressedFileIO.setown(createCompressedFileWriter(iFileIO, TestRwFlag(flags, rw_compressblkcrc), fixedSize, TestRwFlag(flags, rw_compressblkcrc), compressor, getCompMethod(flags)));
if (compressorBlkSz)
compressedFileIO->setBlockSize(compressorBlkSz);
iFileIO = compressedFileIO.get();
}
Owned<IFileIOStream> stream;
if (TestRwFlag(flags, rw_buffered))
stream.setown(createBufferedIOStream(iFileIO));
else
stream.setown(createIOStream(iFileIO));
if (flags & rw_extend)
stream->seek(0, IFSend);
flags &= ~((unsigned)(rw_extend|rw_buffered));
flags &= ~((unsigned)(rw_extend|rw_buffered|COMP_MASK));
return createRowWriter(stream, rowIf, flags);
}

Expand Down
3 changes: 2 additions & 1 deletion common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ interface IExtRowWriter: extends IRowWriter
virtual offset_t getPosition() = 0;
using IRowWriter::flush;
virtual void flush(CRC32 *crcout) = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
};

enum EmptyRowSemantics { ers_forbidden, ers_allow, ers_eogonly };
Expand Down Expand Up @@ -210,7 +211,7 @@ extern THORHELPER_API IExtRowStream *createRowStream(IFile *file, IRowInterfaces
extern THORHELPER_API IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowif, offset_t offset=0, offset_t len=(offset_t)-1, unsigned __int64 maxrows=(unsigned __int64)-1, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=nullptr, ITranslator *translatorContainer=nullptr, IVirtualFieldCallback * _fieldCallback = nullptr);
interface ICompressor;
extern THORHELPER_API IExtRowWriter *createRowWriter(IFile *file, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, ICompressor *compressor=NULL, size32_t compressorBlkSz=0);
extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIO *fileIO, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, size32_t compressorBlkSz=0);
extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, ICompressor *compressor=nullptr, size32_t compressorBlkSz=0);
extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS); // strm should be unbuffered

interface THORHELPER_API IDiskMerger : extends IInterface
Expand Down
8 changes: 7 additions & 1 deletion thorlcr/activities/hashdistrib/thhashdistrib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class HashDistributeActivityMaster : public HashDistributeMasterBase
HashDistributeActivityMaster(DistributeMode mode, CMasterGraphElement *info) : HashDistributeMasterBase(mode, info) { }
};

class HashDedupActivityMaster : public HashDistributeMasterBase
{
public:
HashDedupActivityMaster(DistributeMode mode, CMasterGraphElement *info) : HashDistributeMasterBase(mode, info, hashDedupActivityStatistics) { }
};

class HashJoinDistributeActivityMaster : public HashDistributeMasterBase
{
public:
Expand Down Expand Up @@ -233,7 +239,7 @@ CActivityBase *createHashDedupMergeActivityMaster(CMasterGraphElement *container
if (container->queryLocalOrGrouped())
return new CMasterActivity(container);
else
return new HashDistributeActivityMaster(DM_dedup, container);
return new HashDedupActivityMaster(DM_dedup, container);
}

CActivityBase *createHashJoinActivityMaster(CMasterGraphElement *container)
Expand Down
82 changes: 76 additions & 6 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#define HDSendPrintLog5(M,P1,P2,P3,P4)
#endif


class CDistributorBase : implements IHashDistributor, implements IExceptionHandler, public CInterface
{
Linked<IThorRowInterfaces> rowIf;
Expand Down Expand Up @@ -2649,6 +2650,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
IThorRowInterfaces *rowIf;
rowcount_t count;
Owned<CFileOwner> spillFile;
Owned<IFileIO> spillFileIO;
IRowWriter *writer;
StringAttr desc;
unsigned bucketN, rwFlags;
Expand Down Expand Up @@ -2683,9 +2685,10 @@ class CSpill : implements IRowWriter, public CSimpleInterface
owner.getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
setCompFlag(compType, rwFlags);
}
writer = createRowWriter(iFile, rowIf, rwFlags);
spillFileIO.setown(iFile->open((rwFlags & rw_extend)?IFOwrite:IFOcreate));
writer = createRowWriter(spillFileIO, rowIf, rwFlags);
}
IRowStream *getReader(rowcount_t *_count=NULL) // NB: also detatches ownership of 'fileOwner'
IRowStream *getReader(rowcount_t *_count=NULL) // NB: also detaches ownership of 'fileOwner'
{
assertex(NULL == writer); // should have been closed
Owned<CFileOwner> fileOwner = spillFile.getClear();
Expand All @@ -2706,17 +2709,27 @@ class CSpill : implements IRowWriter, public CSimpleInterface
{
if (NULL == writer)
return;
flush();
writer->flush();
spillFileIO->flush();
::Release(writer);
writer = NULL;
}
bool isClosed() const
{
return (writer==NULL);
}
unsigned __int64 getStatistic(StatisticKind kind) const
{
return spillFileIO->getStatistic(kind);
}

// IRowWriter
virtual void putRow(const void *row)
{
writer->putRow(row);
++count; // NULL's too (but there won't be any in usage of this impl.)
}
virtual void flush()
virtual void flush() override
{
writer->flush();
}
Expand All @@ -2738,7 +2751,6 @@ class CBucket : public CSimpleInterface
void doSpillHashTable();
bool completed = false;
bool streamed = false;

public:
CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows);
bool addKey(const void *key, unsigned hashValue);
Expand Down Expand Up @@ -2789,11 +2801,31 @@ class CBucket : public CSimpleInterface
{
return completed;
}
inline __int64 getStatistic(StatisticKind kind) const
{
switch (kind)
{
case StCycleDiskWriteIOCycles:
return rowSpill.getStatistic(StCycleDiskWriteIOCycles) + keySpill.getStatistic(StCycleDiskWriteIOCycles);
case StTimeDiskWriteIO:
return rowSpill.getStatistic(StTimeDiskWriteIO) + keySpill.getStatistic(StTimeDiskWriteIO);
case StSizeSpillFile:
return rowSpill.getStatistic(StSizeDiskWrite) + keySpill.getStatistic(StSizeDiskWrite);
case StNumDiskWrites:
return rowSpill.getStatistic(StNumDiskWrites) + keySpill.getStatistic(StNumDiskWrites);
case StNumSpills:
return 2; // 1 for row spill + 1 for key spill
default:
return 0;
}
}
};

class CBucketHandler : public CSimpleInterface, implements IInterface, implements roxiemem::IBufferedRowCallback
{
protected:
HashDedupSlaveActivityBase &owner;
private:
IThorRowInterfaces *rowIf, *keyIf;
IHash *iRowHash, *iKeyHash;
ICompare *iCompare;
Expand All @@ -2807,6 +2839,11 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
bool callbacksInstalled = false;
unsigned nextBestBucket = 0;
CriticalSection spillCrit;
RelaxedAtomic<__int64> statSizeSpill = 0;
RelaxedAtomic<__int64> statWriteCycles = 0;
RelaxedAtomic<__int64> statWriteNs = 0;
RelaxedAtomic<__int64> statNumWrites = 0;
RelaxedAtomic<__int64> statNumSpills = 0;

rowidx_t getTotalBucketCount() const
{
Expand Down Expand Up @@ -2857,6 +2894,14 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
return false;
}
} postSpillFlush;
void addBucketStats(CBucket *bucket)
{
statSizeSpill.add_fetch(bucket->getStatistic(StSizeSpillFile));
statWriteCycles.add_fetch(bucket->getStatistic(StCycleDiskWriteIOCycles));
statWriteNs.add_fetch(bucket->getStatistic(StTimeDiskWriteIO));
statNumWrites.add_fetch(bucket->getStatistic(StNumDiskWrites));
statNumSpills.add_fetch(bucket->getStatistic(StNumSpills));
}
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

Expand Down Expand Up @@ -2903,7 +2948,10 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
{
// If marked as done, then can close now (NB: must be closed before can be read by getNextBestRowStream())
if (bucket->isCompleted())
{
bucket->closeSpillStreams(); // close stream now, to flush rows out in write streams, so ready to be read
addBucketStats(bucket);
}
return true;
}
}
Expand Down Expand Up @@ -2942,7 +2990,10 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
{
CBucket &bucket = *buckets[cur];
if (bucket.isSpilt())
{
bucket.closeSpillStreams(); // close stream now, to flush rows out in write streams, so ready to be read
addBucketStats(&bucket);
}
else
bucket.setCompleted();
}
Expand Down Expand Up @@ -2987,6 +3038,24 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
}
return nullptr;
}
__int64 getStatistic(StatisticKind kind) const
{
switch(kind)
{
case StSizeSpillFile:
return statSizeSpill.load();
case StCycleDiskWriteIOCycles:
return statWriteCycles.load();
case StTimeDiskWriteIO:
return statWriteNs.load();
case StNumDiskWrites:
return statNumWrites.load();
case StNumSpills:
return statNumSpills.load();
default:
return 0;
}
}
};

class HashDedupSlaveActivityBase : public CSlaveActivity
Expand Down Expand Up @@ -3054,7 +3123,7 @@ class HashDedupSlaveActivityBase : public CSlaveActivity
IMPLEMENT_IINTERFACE_USING(CSlaveActivity);

HashDedupSlaveActivityBase(CGraphElementBase *_container, bool _local)
: CSlaveActivity(_container), local(_local)
: CSlaveActivity(_container, hashDedupActivityStatistics), local(_local)
{
helper = (IHThorHashDedupArg *)queryHelper();
initialNumBuckets = 0;
Expand Down Expand Up @@ -3563,6 +3632,7 @@ CBucketHandler::CBucketHandler(HashDedupSlaveActivityBase &_owner, IThorRowInter

CBucketHandler::~CBucketHandler()
{
mergeStats(owner.inactiveStats, this);
if (callbacksInstalled)
{
owner.queryRowManager()->removeRowBuffer(this);
Expand Down
1 change: 1 addition & 0 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSiz
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StNumDiskWrites}, basicActivityStatistics);

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ extern graph_decl const StatisticsMapping indexDistribActivityStatistics;
extern graph_decl const StatisticsMapping soapcallActivityStatistics;

extern graph_decl const StatisticsMapping indexReadFileStatistics;
extern graph_decl const StatisticsMapping hashDedupActivityStatistics;

class BooleanOnOff
{
bool &tf;
Expand Down

0 comments on commit b04690e

Please sign in to comment.