diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 38054cf60fd..951b7db82d2 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -71,8 +71,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl ThorRowQueue *in; size32_t insz; ThorRowQueue *out; - Linked file; - Owned fileio; + CFileOwner tmpFileOwner; + Owned tempFileIO; SpinLock lock; bool waiting; Semaphore waitsem; @@ -141,12 +141,12 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl insz = 0; return; } - if (!fileio) { + if (!tempFileIO) { SpinUnblock unblock(lock); - fileio.setown(file->open(IFOcreaterw)); - if (!fileio) + tempFileIO.setown(tmpFileOwner.queryIFile().open(IFOcreaterw)); + if (!tempFileIO) { - throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",file->queryFilename()); + throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s", tmpFileOwner.queryIFile().queryFilename()); } } MemoryBuffer mb; @@ -184,7 +184,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl size32_t left = nb*blocksize-mb.length(); memset(mb.reserve(left),0,left); } - fileio->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase()); + tempFileIO->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase()); + tmpFileOwner.noteSize(numblocks*blocksize); mb.clear(); } if (waiting) { @@ -222,8 +223,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl size32_t readBlockSize = nb*blocksize; byte *buf = (byte *)ma.allocate(readBlockSize); CThorStreamDeserializerSource ds(readBlockSize,buf); - assertex(fileio.get()); - size32_t rd = fileio->read(blk*(offset_t)blocksize,readBlockSize,buf); + assertex(tempFileIO.get()); + size32_t rd = tempFileIO->read(blk*(offset_t)blocksize,readBlockSize,buf); assertex(rd==readBlockSize); for (;;) { byte b; @@ -248,8 +249,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CSmartRowBuffer(CActivityBase *_activity, IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif) - : activity(_activity), file(_file), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer()) + CSmartRowBuffer(CActivityBase *_activity,IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif) + : activity(_activity), tmpFileOwner(_file, _activity->queryTempFileSizeTracker()), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer()) { #ifdef _DEBUG putrecheck = false; @@ -263,7 +264,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl numblocks = 0; insz = 0; eoi = false; - diskfree.setown(createThreadSafeBitSet()); + diskfree.setown(createThreadSafeBitSet()); #ifdef _FULL_TRACE ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this); @@ -277,18 +278,14 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl #endif assertex(!waiting); assertex(!waitflush); - // clear in/out contents - while (in->ordinality()) + // clear in/out contents + while (in->ordinality()) ReleaseThorRow(in->dequeue()); delete in; - while (out->ordinality()) + while (out->ordinality()) ReleaseThorRow(out->dequeue()); delete out; - if (fileio) - { - fileio.clear(); - file->remove(); - } + tempFileIO.clear(); } void putRow(const void *row) @@ -1223,8 +1220,8 @@ static StringBuffer &getFileIOStats(StringBuffer &output, IFileIO *iFileIO) class CSharedWriteAheadDisk : public CSharedWriteAheadBase { - Owned spillFile; - Owned spillFileIO; + Owned tempFileOwner; + Owned tempFileIO; CIArrayOf freeChunks; PointerArrayOf freeChunksSized; QueueOf savedChunks; @@ -1446,7 +1443,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase } else { - Owned stream = createFileSerialStream(spillFileIO, chunk.offset); + Owned stream = createFileSerialStream(tempFileIO, chunk.offset); #ifdef TRACE_WRITEAHEAD unsigned diskChunkNum; stream->get(sizeof(diskChunkNum), &diskChunkNum); @@ -1510,7 +1507,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase mb.append((byte)0); size32_t len = mb.length(); chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list - spillFileIO->write(chunk->offset, len, mb.toByteArray()); + tempFileIO->write(chunk->offset, len, mb.toByteArray()); + tempFileOwner->noteSize(highOffset); #ifdef TRACE_WRITEAHEAD ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len); #endif @@ -1533,21 +1531,21 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()) { assertex(spillName); - spillFile.setown(createIFile(spillName)); - spillFile->setShareMode(IFSHnone); - spillFileIO.setown(spillFile->open(IFOcreaterw)); + tempFileOwner.setown(activity->createOwnedTempFile(spillName)); + tempFileOwner->queryIFile().setShareMode(IFSHnone); + tempFileIO.setown(tempFileOwner->queryIFile().open(IFOcreaterw)); highOffset = 0; } ~CSharedWriteAheadDisk() { - if (spillFile) + if (tempFileIO) { StringBuffer tracing; - getFileIOStats(tracing, spillFileIO); - activity->ActPrintLog("CSharedWriteAheadDisk: removing spill file: %s%s", spillFile->queryFilename(), tracing.str()); - spillFileIO.clear(); - spillFile->remove(); + getFileIOStats(tracing, tempFileIO); + activity->ActPrintLog("CSharedWriteAheadDisk: removing spill file: %s%s", tempFileOwner->queryIFile().queryFilename(), tracing.str()); + tempFileIO.clear(); } + for (;;) { Owned chunk = savedChunks.dequeue(); @@ -1566,7 +1564,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase freeChunks.kill(); freeChunksSized.kill(); highOffset = 0; - spillFileIO->setSize(0); + tempFileIO->setSize(0); + tempFileOwner->noteSize(0); } };