Skip to content

Commit

Permalink
Merge pull request #18808 from ghalliday/issue32136
Browse files Browse the repository at this point in the history
HPCC-32136 Allow the input and output to be replaced in a buffered class

Reviewed-by: Jake Smith <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jun 27, 2024
2 parents 30cae58 + 6835477 commit d53e19f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
24 changes: 20 additions & 4 deletions system/jlib/jstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ IByteInputStream *createInputStream(int handle)
// This means the buffer size is likely to be bigger than the block size - the class is passed
// an initial estimate for the potential overlap.

class CBlockedSerialInputStream : public CInterfaceOf<IBufferedSerialInputStream>
class CBlockedSerialInputStream final : public CInterfaceOf<IBufferedSerialInputStream>
{
public:
CBlockedSerialInputStream(ISerialInputStream * _input, size32_t _blockReadSize)
Expand Down Expand Up @@ -318,7 +318,7 @@ class CBlockedSerialInputStream : public CInterfaceOf<IBufferedSerialInputStream
}

//While there are blocks larger than the buffer size read directly into the target buffer
while (sizeRead + blockReadSize <= len)
while (unlikely(sizeRead + blockReadSize <= len))
{
size32_t got = readNextBlock(blockReadSize, target+sizeRead);
if ((got == 0) || (got == BufferTooSmall))
Expand All @@ -327,7 +327,7 @@ class CBlockedSerialInputStream : public CInterfaceOf<IBufferedSerialInputStream
nextBlockOffset += got;
}

while ((sizeRead < len) && !endOfStream)
while (likely((sizeRead < len) && !endOfStream))
{
assertex(bufferOffset == dataLength);
// NOTE: This could read less than a block, even if a whole block was requested.
Expand Down Expand Up @@ -368,7 +368,7 @@ class CBlockedSerialInputStream : public CInterfaceOf<IBufferedSerialInputStream
virtual void get(size32_t len, void * ptr) override
{
size32_t numRead = read(len, ptr);
if (numRead != len)
if (unlikely(numRead != len))
throw makeStringExceptionV(-1, "End of input stream for read of %u bytes at offset %llu", len, tell()-numRead);
}

Expand Down Expand Up @@ -405,6 +405,11 @@ class CBlockedSerialInputStream : public CInterfaceOf<IBufferedSerialInputStream
input->reset(_offset, _flen);
}

virtual void replaceInput(ISerialInputStream * newInput) override
{
input.set(newInput);
}

protected:
inline byte * data(size32_t offset) { return (byte *)buffer.get() + offset; }
inline size32_t available() const { return dataLength - bufferOffset; }
Expand Down Expand Up @@ -792,6 +797,11 @@ class CBlockedSerialOutputStream final : public CInterfaceOf<IBufferedSerialOutp

virtual offset_t tell() const override { return blockOffset+bufferOffset; }

virtual void replaceOutput(ISerialOutputStream * newOutput) override
{
output.set(newOutput);
}

//-------------------------------------------------------
//Helper functions for CThreadedBlockedSerialOutputStream
//doCommit() and doResume are also used by this class
Expand Down Expand Up @@ -1007,6 +1017,12 @@ class CThreadedBlockedSerialOutputStream final : public CInterfaceOf<IBufferedSe
checkForPendingWrite();
}

virtual void replaceOutput(ISerialOutputStream * newOutput) override
{
stream[0].replaceOutput(newOutput);
stream[1].replaceOutput(newOutput);
}

protected:
virtual offset_t tell() const override
{
Expand Down
6 changes: 5 additions & 1 deletion system/jlib/jstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ interface ISerialStream : extends ISerialInputStream
// if got<wanted then approaching eof
// if got>wanted then got is size available in buffer
};
using IBufferedSerialInputStream = ISerialStream;
interface IBufferedSerialInputStream : extends ISerialStream
{
virtual void replaceInput(ISerialInputStream * newInput) = 0;
};

/* example of reading a nul terminated string using ISerialStream peek and skip
{
Expand Down Expand Up @@ -100,6 +103,7 @@ interface IBufferedSerialOutputStream : extends ISerialOutputStream
virtual void commit(size32_t written) = 0 ; // commit the data written to the block returned by reserve
virtual void suspend(size32_t wanted) = 0; // Reserve some bytes and prevent data being flushed to the next stage until endNested is called. May nest.
virtual void resume(size32_t len, const void * ptr) = 0; // update the data allocated by suspend and allow flushing.
virtual void replaceOutput(ISerialOutputStream * newOutput) = 0;
};

interface ICompressor;
Expand Down
1 change: 1 addition & 0 deletions testing/unittests/jstreamtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ class NullOuputStream : public CInterfaceOf<IBufferedSerialOutputStream>
virtual void suspend(size32_t wanted) {}
virtual void resume(size32_t len, const void * ptr) {}
virtual offset_t tell() const override { return 0; }
virtual void replaceOutput(ISerialOutputStream * newOutput) override {}
};

class JlibStreamStressTest : public CppUnit::TestFixture
Expand Down

0 comments on commit d53e19f

Please sign in to comment.