diff --git a/common/thorhelper/thorcommon.cpp b/common/thorhelper/thorcommon.cpp index 5fbbe5f7f7a..fcda7b27ac5 100644 --- a/common/thorhelper/thorcommon.cpp +++ b/common/thorhelper/thorcommon.cpp @@ -1621,6 +1621,10 @@ class CRowStreamWriter : private IRowSerializerTarget, implements IExtRowWriter, --nested; } + virtual unsigned __int64 getStatistic(StatisticKind kind) override + { + return stream->getStatistic(kind); + } }; #ifdef TRACE_CREATE @@ -1648,10 +1652,19 @@ IExtRowWriter *createRowWriter(IFile *iFile, IRowInterfaces *rowIf, unsigned fla return createRowWriter(iFileIO, rowIf, flags); } -IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags, size32_t compressorBlkSz) +IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags, ICompressor *compressor, size32_t compressorBlkSz) { + Owned compressedFileIO; if (TestRwFlag(flags, rw_compress)) - throw MakeStringException(0, "Unsupported createRowWriter flags"); + { + size32_t fixedSize = rowIf->queryRowMetaData()->querySerializedDiskMeta()->getFixedSize(); + if (fixedSize && TestRwFlag(flags, rw_grouped)) + ++fixedSize; // row writer will include a grouping byte + compressedFileIO.setown(createCompressedFileWriter(iFileIO, TestRwFlag(flags, rw_compressblkcrc), fixedSize, TestRwFlag(flags, rw_compressblkcrc), compressor, getCompMethod(flags))); + if (compressorBlkSz) + compressedFileIO->setBlockSize(compressorBlkSz); + iFileIO = compressedFileIO.get(); + } Owned stream; if (TestRwFlag(flags, rw_buffered)) stream.setown(createBufferedIOStream(iFileIO)); @@ -1659,7 +1672,7 @@ IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned stream.setown(createIOStream(iFileIO)); if (flags & rw_extend) stream->seek(0, IFSend); - flags &= ~((unsigned)(rw_extend|rw_buffered)); + flags &= ~((unsigned)(rw_extend|rw_buffered|COMP_MASK)); return createRowWriter(stream, rowIf, flags); } diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index dd833e7977f..6c60a5b1b83 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -172,6 +172,7 @@ interface IExtRowWriter: extends IRowWriter virtual offset_t getPosition() = 0; using IRowWriter::flush; virtual void flush(CRC32 *crcout) = 0; + virtual unsigned __int64 getStatistic(StatisticKind kind) = 0; }; enum EmptyRowSemantics { ers_forbidden, ers_allow, ers_eogonly }; @@ -210,7 +211,7 @@ extern THORHELPER_API IExtRowStream *createRowStream(IFile *file, IRowInterfaces extern THORHELPER_API IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowif, offset_t offset=0, offset_t len=(offset_t)-1, unsigned __int64 maxrows=(unsigned __int64)-1, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=nullptr, ITranslator *translatorContainer=nullptr, IVirtualFieldCallback * _fieldCallback = nullptr); interface ICompressor; extern THORHELPER_API IExtRowWriter *createRowWriter(IFile *file, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, ICompressor *compressor=NULL, size32_t compressorBlkSz=0); -extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIO *fileIO, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, size32_t compressorBlkSz=0); +extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, ICompressor *compressor=nullptr, size32_t compressorBlkSz=0); extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS); // strm should be unbuffered interface THORHELPER_API IDiskMerger : extends IInterface diff --git a/thorlcr/activities/hashdistrib/thhashdistrib.cpp b/thorlcr/activities/hashdistrib/thhashdistrib.cpp index 8a347c9b3fe..7ec268b8515 100644 --- a/thorlcr/activities/hashdistrib/thhashdistrib.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistrib.cpp @@ -75,6 +75,12 @@ class HashDistributeActivityMaster : public HashDistributeMasterBase HashDistributeActivityMaster(DistributeMode mode, CMasterGraphElement *info) : HashDistributeMasterBase(mode, info) { } }; +class HashDedupActivityMaster : public HashDistributeMasterBase +{ +public: + HashDedupActivityMaster(DistributeMode mode, CMasterGraphElement *info) : HashDistributeMasterBase(mode, info, hashDedupActivityStatistics) { } +}; + class HashJoinDistributeActivityMaster : public HashDistributeMasterBase { public: @@ -233,7 +239,7 @@ CActivityBase *createHashDedupMergeActivityMaster(CMasterGraphElement *container if (container->queryLocalOrGrouped()) return new CMasterActivity(container); else - return new HashDistributeActivityMaster(DM_dedup, container); + return new HashDedupActivityMaster(DM_dedup, container); } CActivityBase *createHashJoinActivityMaster(CMasterGraphElement *container) diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 6413f395132..e6c53ceedee 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -77,6 +77,7 @@ #define HDSendPrintLog5(M,P1,P2,P3,P4) #endif + class CDistributorBase : implements IHashDistributor, implements IExceptionHandler, public CInterface { Linked rowIf; @@ -2649,6 +2650,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface IThorRowInterfaces *rowIf; rowcount_t count; Owned spillFile; + Owned spillFileIO; IRowWriter *writer; StringAttr desc; unsigned bucketN, rwFlags; @@ -2683,9 +2685,10 @@ class CSpill : implements IRowWriter, public CSimpleInterface owner.getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType); setCompFlag(compType, rwFlags); } - writer = createRowWriter(iFile, rowIf, rwFlags); + spillFileIO.setown(iFile->open((rwFlags & rw_extend)?IFOwrite:IFOcreate)); + writer = createRowWriter(spillFileIO, rowIf, rwFlags); } - IRowStream *getReader(rowcount_t *_count=NULL) // NB: also detatches ownership of 'fileOwner' + IRowStream *getReader(rowcount_t *_count=NULL) // NB: also detaches ownership of 'fileOwner' { assertex(NULL == writer); // should have been closed Owned fileOwner = spillFile.getClear(); @@ -2706,17 +2709,27 @@ class CSpill : implements IRowWriter, public CSimpleInterface { if (NULL == writer) return; - flush(); + writer->flush(); + spillFileIO->flush(); ::Release(writer); writer = NULL; } + bool isClosed() const + { + return (writer==NULL); + } + unsigned __int64 getStatistic(StatisticKind kind) const + { + return spillFileIO->getStatistic(kind); + } + // IRowWriter virtual void putRow(const void *row) { writer->putRow(row); ++count; // NULL's too (but there won't be any in usage of this impl.) } - virtual void flush() + virtual void flush() override { writer->flush(); } @@ -2738,7 +2751,6 @@ class CBucket : public CSimpleInterface void doSpillHashTable(); bool completed = false; bool streamed = false; - public: CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows); bool addKey(const void *key, unsigned hashValue); @@ -2789,11 +2801,31 @@ class CBucket : public CSimpleInterface { return completed; } + inline __int64 getStatistic(StatisticKind kind) const + { + switch (kind) + { + case StCycleDiskWriteIOCycles: + return rowSpill.getStatistic(StCycleDiskWriteIOCycles) + keySpill.getStatistic(StCycleDiskWriteIOCycles); + case StTimeDiskWriteIO: + return rowSpill.getStatistic(StTimeDiskWriteIO) + keySpill.getStatistic(StTimeDiskWriteIO); + case StSizeSpillFile: + return rowSpill.getStatistic(StSizeDiskWrite) + keySpill.getStatistic(StSizeDiskWrite); + case StNumDiskWrites: + return rowSpill.getStatistic(StNumDiskWrites) + keySpill.getStatistic(StNumDiskWrites); + case StNumSpills: + return 2; // 1 for row spill + 1 for key spill + default: + return 0; + } + } }; class CBucketHandler : public CSimpleInterface, implements IInterface, implements roxiemem::IBufferedRowCallback { +protected: HashDedupSlaveActivityBase &owner; +private: IThorRowInterfaces *rowIf, *keyIf; IHash *iRowHash, *iKeyHash; ICompare *iCompare; @@ -2807,6 +2839,11 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement bool callbacksInstalled = false; unsigned nextBestBucket = 0; CriticalSection spillCrit; + RelaxedAtomic<__int64> statSizeSpill = 0; + RelaxedAtomic<__int64> statWriteCycles = 0; + RelaxedAtomic<__int64> statWriteNs = 0; + RelaxedAtomic<__int64> statNumWrites = 0; + RelaxedAtomic<__int64> statNumSpills = 0; rowidx_t getTotalBucketCount() const { @@ -2857,6 +2894,14 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement return false; } } postSpillFlush; + void addBucketStats(CBucket *bucket) + { + statSizeSpill.add_fetch(bucket->getStatistic(StSizeSpillFile)); + statWriteCycles.add_fetch(bucket->getStatistic(StCycleDiskWriteIOCycles)); + statWriteNs.add_fetch(bucket->getStatistic(StTimeDiskWriteIO)); + statNumWrites.add_fetch(bucket->getStatistic(StNumDiskWrites)); + statNumSpills.add_fetch(bucket->getStatistic(StNumSpills)); + } public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); @@ -2903,7 +2948,10 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement { // If marked as done, then can close now (NB: must be closed before can be read by getNextBestRowStream()) if (bucket->isCompleted()) + { bucket->closeSpillStreams(); // close stream now, to flush rows out in write streams, so ready to be read + addBucketStats(bucket); + } return true; } } @@ -2942,7 +2990,10 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement { CBucket &bucket = *buckets[cur]; if (bucket.isSpilt()) + { bucket.closeSpillStreams(); // close stream now, to flush rows out in write streams, so ready to be read + addBucketStats(&bucket); + } else bucket.setCompleted(); } @@ -2987,6 +3038,24 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement } return nullptr; } + __int64 getStatistic(StatisticKind kind) const + { + switch(kind) + { + case StSizeSpillFile: + return statSizeSpill.load(); + case StCycleDiskWriteIOCycles: + return statWriteCycles.load(); + case StTimeDiskWriteIO: + return statWriteNs.load(); + case StNumDiskWrites: + return statNumWrites.load(); + case StNumSpills: + return statNumSpills.load(); + default: + return 0; + } + } }; class HashDedupSlaveActivityBase : public CSlaveActivity @@ -3054,7 +3123,7 @@ class HashDedupSlaveActivityBase : public CSlaveActivity IMPLEMENT_IINTERFACE_USING(CSlaveActivity); HashDedupSlaveActivityBase(CGraphElementBase *_container, bool _local) - : CSlaveActivity(_container), local(_local) + : CSlaveActivity(_container, hashDedupActivityStatistics), local(_local) { helper = (IHThorHashDedupArg *)queryHelper(); initialNumBuckets = 0; @@ -3563,6 +3632,7 @@ CBucketHandler::CBucketHandler(HashDedupSlaveActivityBase &_owner, IThorRowInter CBucketHandler::~CBucketHandler() { + mergeStats(owner.inactiveStats, this); if (callbacksInstalled) { owner.queryRowManager()->removeRowBuffer(this); diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index c2598961693..33550095b0e 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -92,6 +92,7 @@ const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSiz const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics); +const StatisticsMapping hashDedupActivityStatistics({StTimeSpillElapsed, StNumSpills, StSizeSpillFile, StNumDiskWrites}, basicActivityStatistics); MODULE_INIT(INIT_PRIORITY_STANDARD) { diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index ea131972dd0..c26fd2ce200 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -155,6 +155,8 @@ extern graph_decl const StatisticsMapping indexDistribActivityStatistics; extern graph_decl const StatisticsMapping soapcallActivityStatistics; extern graph_decl const StatisticsMapping indexReadFileStatistics; +extern graph_decl const StatisticsMapping hashDedupActivityStatistics; + class BooleanOnOff { bool &tf;