Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.6.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Jun 14, 2024
2 parents 484b609 + 786aa0d commit 43bb876
Showing 1 changed file with 32 additions and 33 deletions.
65 changes: 32 additions & 33 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
ThorRowQueue *in;
size32_t insz;
ThorRowQueue *out;
Linked<IFile> file;
Owned<IFileIO> fileio;
CFileOwner tmpFileOwner;
Owned<IFileIO> tempFileIO;
SpinLock lock;
bool waiting;
Semaphore waitsem;
Expand Down Expand Up @@ -141,12 +141,12 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
insz = 0;
return;
}
if (!fileio) {
if (!tempFileIO) {
SpinUnblock unblock(lock);
fileio.setown(file->open(IFOcreaterw));
if (!fileio)
tempFileIO.setown(tmpFileOwner.queryIFile().open(IFOcreaterw));
if (!tempFileIO)
{
throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",file->queryFilename());
throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s", tmpFileOwner.queryIFile().queryFilename());
}
}
MemoryBuffer mb;
Expand Down Expand Up @@ -184,7 +184,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
size32_t left = nb*blocksize-mb.length();
memset(mb.reserve(left),0,left);
}
fileio->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase());
tempFileIO->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase());
tmpFileOwner.noteSize(numblocks*blocksize);
mb.clear();
}
if (waiting) {
Expand Down Expand Up @@ -222,8 +223,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
size32_t readBlockSize = nb*blocksize;
byte *buf = (byte *)ma.allocate(readBlockSize);
CThorStreamDeserializerSource ds(readBlockSize,buf);
assertex(fileio.get());
size32_t rd = fileio->read(blk*(offset_t)blocksize,readBlockSize,buf);
assertex(tempFileIO.get());
size32_t rd = tempFileIO->read(blk*(offset_t)blocksize,readBlockSize,buf);
assertex(rd==readBlockSize);
for (;;) {
byte b;
Expand All @@ -248,8 +249,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CSmartRowBuffer(CActivityBase *_activity, IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif)
: activity(_activity), file(_file), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer())
CSmartRowBuffer(CActivityBase *_activity,IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif)
: activity(_activity), tmpFileOwner(_file, _activity->queryTempFileSizeTracker()), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer())
{
#ifdef _DEBUG
putrecheck = false;
Expand All @@ -263,7 +264,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
numblocks = 0;
insz = 0;
eoi = false;
diskfree.setown(createThreadSafeBitSet());
diskfree.setown(createThreadSafeBitSet());

#ifdef _FULL_TRACE
ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this);
Expand All @@ -277,18 +278,14 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
#endif
assertex(!waiting);
assertex(!waitflush);
// clear in/out contents
while (in->ordinality())
// clear in/out contents
while (in->ordinality())
ReleaseThorRow(in->dequeue());
delete in;
while (out->ordinality())
while (out->ordinality())
ReleaseThorRow(out->dequeue());
delete out;
if (fileio)
{
fileio.clear();
file->remove();
}
tempFileIO.clear();
}

void putRow(const void *row)
Expand Down Expand Up @@ -1223,8 +1220,8 @@ static StringBuffer &getFileIOStats(StringBuffer &output, IFileIO *iFileIO)

class CSharedWriteAheadDisk : public CSharedWriteAheadBase
{
Owned<IFile> spillFile;
Owned<IFileIO> spillFileIO;
Owned<CFileOwner> tempFileOwner;
Owned<IFileIO> tempFileIO;
CIArrayOf<Chunk> freeChunks;
PointerArrayOf<Chunk> freeChunksSized;
QueueOf<Chunk, false> savedChunks;
Expand Down Expand Up @@ -1446,7 +1443,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
}
else
{
Owned<ISerialStream> stream = createFileSerialStream(spillFileIO, chunk.offset);
Owned<ISerialStream> stream = createFileSerialStream(tempFileIO, chunk.offset);
#ifdef TRACE_WRITEAHEAD
unsigned diskChunkNum;
stream->get(sizeof(diskChunkNum), &diskChunkNum);
Expand Down Expand Up @@ -1510,7 +1507,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
mb.append((byte)0);
size32_t len = mb.length();
chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list
spillFileIO->write(chunk->offset, len, mb.toByteArray());
tempFileIO->write(chunk->offset, len, mb.toByteArray());
tempFileOwner->noteSize(highOffset);
#ifdef TRACE_WRITEAHEAD
ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len);
#endif
Expand All @@ -1533,21 +1531,21 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta())
{
assertex(spillName);
spillFile.setown(createIFile(spillName));
spillFile->setShareMode(IFSHnone);
spillFileIO.setown(spillFile->open(IFOcreaterw));
tempFileOwner.setown(activity->createOwnedTempFile(spillName));
tempFileOwner->queryIFile().setShareMode(IFSHnone);
tempFileIO.setown(tempFileOwner->queryIFile().open(IFOcreaterw));
highOffset = 0;
}
~CSharedWriteAheadDisk()
{
if (spillFile)
if (tempFileIO)
{
StringBuffer tracing;
getFileIOStats(tracing, spillFileIO);
activity->ActPrintLog("CSharedWriteAheadDisk: removing spill file: %s%s", spillFile->queryFilename(), tracing.str());
spillFileIO.clear();
spillFile->remove();
getFileIOStats(tracing, tempFileIO);
activity->ActPrintLog("CSharedWriteAheadDisk: removing spill file: %s%s", tempFileOwner->queryIFile().queryFilename(), tracing.str());
tempFileIO.clear();
}

for (;;)
{
Owned<Chunk> chunk = savedChunks.dequeue();
Expand All @@ -1566,7 +1564,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
freeChunks.kill();
freeChunksSized.kill();
highOffset = 0;
spillFileIO->setSize(0);
tempFileIO->setSize(0);
tempFileOwner->noteSize(0);
}
};

Expand Down

0 comments on commit 43bb876

Please sign in to comment.