diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index c217c5753a2..b2e5f034417 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -95,6 +95,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl size32_t fixedEstSize; Owned pipewr; Owned piperd; + mutable CriticalSection critPiperd; protected: /* @@ -1189,21 +1190,24 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl ihash = _ihash; iCompare = _iCompare; keepBestCompare = _keepBestCompare; - if (allowSpill) { - StringBuffer temp; - GetTempFilePath(temp,"hddrecvbuff"); - if (newLookAhead) + CriticalBlock block(critPiperd); + if (allowSpill) { - options.totalCompressionBufferSize = pullBufferSize; // hd option overrides defaults - ICompressHandler *compressHandler = pullBufferSize ? queryDefaultCompressHandler() : nullptr; - piperd.setown(createCompressedSpillingRowStream(activity, temp.str(), false, rowIf, options, compressHandler)); + StringBuffer temp; + GetTempFilePath(temp,"hddrecvbuff"); + if (newLookAhead) + { + options.totalCompressionBufferSize = pullBufferSize; // hd option overrides defaults + ICompressHandler *compressHandler = pullBufferSize ? queryDefaultCompressHandler() : nullptr; + piperd.setown(createCompressedSpillingRowStream(activity, temp.str(), false, rowIf, options, compressHandler)); + } + else + piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf)); } else - piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf)); + piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize)); } - else - piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize)); pipewr.set(piperd->queryWriter()); connected = true; @@ -1245,7 +1249,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl deserializer = NULL; fixedEstSize = 0; input.clear(); - piperd.clear(); + { + CriticalBlock block(critPiperd); + piperd.clear(); + } pipewr.clear(); ihash = NULL; iCompare = NULL; @@ -1447,6 +1454,9 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl virtual void mergeStats(CRuntimeStatisticCollection &stats) const { sender.mergeStats(stats); + CriticalBlock block(critPiperd); + if (piperd) + mergeRemappedStats(stats, piperd, diskToTempStatsMap); } // IExceptionHandler impl. virtual bool fireException(IException *e) diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 1abc71cdcfe..ba6163fbba1 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -76,6 +76,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl ThorRowQueue *out; CFileOwner tmpFileOwner; Owned tempFileIO; + mutable CriticalSection critTmpFileIO; SpinLock lock; bool waiting; Semaphore waitsem; @@ -146,6 +147,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl } if (!tempFileIO) { SpinUnblock unblock(lock); + CriticalBlock block(critTmpFileIO); tempFileIO.setown(tmpFileOwner.queryIFile().open(IFOcreaterw)); if (!tempFileIO) { @@ -424,6 +426,14 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl { return this; } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override + { + CriticalBlock block(critTmpFileIO); + if (tempFileIO) + return tempFileIO->getStatistic(kind); + else + return 0; + } }; @@ -607,6 +617,10 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff { return this; } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override + { + return 0; + } }; @@ -708,6 +722,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, memsize_t pendingFlushToDiskSz = 0; CFileOwner *currentOwnedOutputFile = nullptr; Owned currentOutputIFileIO; // keep for stats + mutable CriticalSection critCurrentOutputIFileIO; CriticalSection outputFilesQCS; std::queue outputFiles; unsigned writeTempFileNum = 0; @@ -733,6 +748,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, RowEntry readFromStreamMarker = { nullptr, 0, 0 }; // misc + CRuntimeStatisticCollection inactiveStats; bool grouped = false; // ctor input parameter CriticalSection readerWriterCS; #ifdef STRESSTEST_SPILLING_ROWSTREAM @@ -766,7 +782,12 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, auto res = createSerialOutputStream(iFile, compressHandler, options, 2); // (2) input & output sharing totalCompressionBufferSize outputStream.setown(std::get<0>(res)); - currentOutputIFileIO.setown(std::get<1>(res)); + { + CriticalBlock b(critCurrentOutputIFileIO); + if (currentOutputIFileIO) + mergeStats(inactiveStats, currentOutputIFileIO); + currentOutputIFileIO.setown(std::get<1>(res)); + } outputStreamSerializer = std::make_unique(outputStream); } void createNextInputStream() @@ -1005,7 +1026,8 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, explicit CCompressedSpillingRowStream(CActivityBase *_activity, const char *_baseTmpFilename, bool _grouped, IThorRowInterfaces *rowIf, const LookAheadOptions &_options, ICompressHandler *_compressHandler) : activity(*_activity), baseTmpFilename(_baseTmpFilename), grouped(_grouped), options(_options), compressHandler(_compressHandler), - meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()) + meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), + inactiveStats(tempFileStatistics) { size32_t minSize = meta->getMinRecordSize(); @@ -1053,6 +1075,14 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, { return this; } + virtual unsigned __int64 getStatistic(StatisticKind kind) const + { + unsigned __int64 v = inactiveStats.queryStatistic(kind).get(); + CriticalBlock b(critCurrentOutputIFileIO); + if (currentOutputIFileIO) + v += currentOutputIFileIO->getStatistic(kind); + return v; + } // IRowStream virtual const void *nextRow() override { diff --git a/thorlcr/thorutil/thbuf.hpp b/thorlcr/thorutil/thbuf.hpp index 69a28f04824..76dfa3cbcfd 100644 --- a/thorlcr/thorutil/thbuf.hpp +++ b/thorlcr/thorutil/thbuf.hpp @@ -61,6 +61,7 @@ struct LookAheadOptions : CommonBufferRowRWStreamOptions interface ISmartRowBuffer: extends IRowStream { virtual IRowWriter *queryWriter() = 0; + virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0; }; class CActivityBase;