Skip to content

Commit

Permalink
Merge pull request #18831 from shamser/issue31984
Browse files Browse the repository at this point in the history
HPCC-31984 Capture additional stats from CSmartRowBuffer temp files

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Oct 18, 2024
2 parents 78461cb + 265a2ea commit 33db4c0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 13 deletions.
32 changes: 21 additions & 11 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
size32_t fixedEstSize;
Owned<IRowWriter> pipewr;
Owned<ISmartRowBuffer> piperd;
mutable CriticalSection critPiperd;

protected:
/*
Expand Down Expand Up @@ -1189,21 +1190,24 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
ihash = _ihash;
iCompare = _iCompare;
keepBestCompare = _keepBestCompare;
if (allowSpill)
{
StringBuffer temp;
GetTempFilePath(temp,"hddrecvbuff");
if (newLookAhead)
CriticalBlock block(critPiperd);
if (allowSpill)
{
options.totalCompressionBufferSize = pullBufferSize; // hd option overrides defaults
ICompressHandler *compressHandler = pullBufferSize ? queryDefaultCompressHandler() : nullptr;
piperd.setown(createCompressedSpillingRowStream(activity, temp.str(), false, rowIf, options, compressHandler));
StringBuffer temp;
GetTempFilePath(temp,"hddrecvbuff");
if (newLookAhead)
{
options.totalCompressionBufferSize = pullBufferSize; // hd option overrides defaults
ICompressHandler *compressHandler = pullBufferSize ? queryDefaultCompressHandler() : nullptr;
piperd.setown(createCompressedSpillingRowStream(activity, temp.str(), false, rowIf, options, compressHandler));
}
else
piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf));
}
else
piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf));
piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize));
}
else
piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize));

pipewr.set(piperd->queryWriter());
connected = true;
Expand Down Expand Up @@ -1245,7 +1249,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
deserializer = NULL;
fixedEstSize = 0;
input.clear();
piperd.clear();
{
CriticalBlock block(critPiperd);
piperd.clear();
}
pipewr.clear();
ihash = NULL;
iCompare = NULL;
Expand Down Expand Up @@ -1447,6 +1454,9 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
// IExceptionHandler impl.
virtual bool fireException(IException *e)
Expand Down
34 changes: 32 additions & 2 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
ThorRowQueue *out;
CFileOwner tmpFileOwner;
Owned<IFileIO> tempFileIO;
mutable CriticalSection critTmpFileIO;
SpinLock lock;
bool waiting;
Semaphore waitsem;
Expand Down Expand Up @@ -146,6 +147,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
}
if (!tempFileIO) {
SpinUnblock unblock(lock);
CriticalBlock block(critTmpFileIO);
tempFileIO.setown(tmpFileOwner.queryIFile().open(IFOcreaterw));
if (!tempFileIO)
{
Expand Down Expand Up @@ -424,6 +426,14 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
{
return this;
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
CriticalBlock block(critTmpFileIO);
if (tempFileIO)
return tempFileIO->getStatistic(kind);
else
return 0;
}
};


Expand Down Expand Up @@ -607,6 +617,10 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff
{
return this;
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
return 0;
}
};


Expand Down Expand Up @@ -708,6 +722,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
memsize_t pendingFlushToDiskSz = 0;
CFileOwner *currentOwnedOutputFile = nullptr;
Owned<IFileIO> currentOutputIFileIO; // keep for stats
mutable CriticalSection critCurrentOutputIFileIO;
CriticalSection outputFilesQCS;
std::queue<CFileOwner *> outputFiles;
unsigned writeTempFileNum = 0;
Expand All @@ -733,6 +748,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
RowEntry readFromStreamMarker = { nullptr, 0, 0 };

// misc
CRuntimeStatisticCollection inactiveStats;
bool grouped = false; // ctor input parameter
CriticalSection readerWriterCS;
#ifdef STRESSTEST_SPILLING_ROWSTREAM
Expand Down Expand Up @@ -766,7 +782,12 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,

auto res = createSerialOutputStream(iFile, compressHandler, options, 2); // (2) input & output sharing totalCompressionBufferSize
outputStream.setown(std::get<0>(res));
currentOutputIFileIO.setown(std::get<1>(res));
{
CriticalBlock b(critCurrentOutputIFileIO);
if (currentOutputIFileIO)
mergeStats(inactiveStats, currentOutputIFileIO);
currentOutputIFileIO.setown(std::get<1>(res));
}
outputStreamSerializer = std::make_unique<COutputStreamSerializer>(outputStream);
}
void createNextInputStream()
Expand Down Expand Up @@ -1005,7 +1026,8 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,

explicit CCompressedSpillingRowStream(CActivityBase *_activity, const char *_baseTmpFilename, bool _grouped, IThorRowInterfaces *rowIf, const LookAheadOptions &_options, ICompressHandler *_compressHandler)
: activity(*_activity), baseTmpFilename(_baseTmpFilename), grouped(_grouped), options(_options), compressHandler(_compressHandler),
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer())
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()),
inactiveStats(tempFileStatistics)
{
size32_t minSize = meta->getMinRecordSize();

Expand Down Expand Up @@ -1053,6 +1075,14 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
{
return this;
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const
{
unsigned __int64 v = inactiveStats.queryStatistic(kind).get();
CriticalBlock b(critCurrentOutputIFileIO);
if (currentOutputIFileIO)
v += currentOutputIFileIO->getStatistic(kind);
return v;
}
// IRowStream
virtual const void *nextRow() override
{
Expand Down
1 change: 1 addition & 0 deletions thorlcr/thorutil/thbuf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct LookAheadOptions : CommonBufferRowRWStreamOptions
interface ISmartRowBuffer: extends IRowStream
{
virtual IRowWriter *queryWriter() = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0;
};

class CActivityBase;
Expand Down

0 comments on commit 33db4c0

Please sign in to comment.