Skip to content

Commit

Permalink
HPCC-30731 Explore effect of compression on transfer times
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Nov 2, 2023
1 parent 9638b37 commit 7289be6
Showing 1 changed file with 107 additions and 32 deletions.
139 changes: 107 additions & 32 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3088,15 +3088,15 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture
CPPUNIT_TEST(test);
CPPUNIT_TEST_SUITE_END();

static constexpr size32_t sz = 100*0x100000; // 100MB
enum CompressOpt { RowCompress, AllRowCompress, BlockCompress };
public:
void test()
{
try
{
size32_t sz = 100*0x100000; // 100MB
MemoryBuffer src;
src.ensureCapacity(sz);
MemoryBuffer compressed;
const char *aesKey = "012345678901234567890123";
Owned<ICompressHandlerIterator> iter = getCompressHandlerIterator();

Expand Down Expand Up @@ -3126,53 +3126,128 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture
}
}

DBGLOG("Algorithm || Compression Time (ms) || Decompression Time (ms) || Compression Ratio");
DBGLOG("Algorithm(options) || Comp(ms) || Deco(ms) || 200MB/s (w,r) || 1GB/s (w,r) || 5GB/s (w,r) || Ratio [cLen]");
DBGLOG(" || || || 2Gb/s || 10Gb/s || 50Gb/s ||");

unsigned time200MBs = transferTimeMs(sz, 200000000);
unsigned time1GBs = transferTimeMs(sz, 1000000000);
unsigned time5GBs = transferTimeMs(sz, 5000000000);
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", "uncompressed", 0, 0,
time200MBs, time200MBs, time200MBs, time1GBs, time1GBs, time1GBs, time5GBs, time5GBs, time5GBs, 1.0, sz);
ForEach(*iter)
{
compressed.clear();
ICompressHandler &handler = iter->query();
const char * type = handler.queryType();
//Ignore unusual compressors with no expanders...
if (strieq(handler.queryType(), "randrow"))
if (strieq(type, "randrow"))
continue;
Owned<ICompressor> compressor = handler.getCompressor(streq("AES", handler.queryType()) ? aesKey: nullptr);
const char * options = streq("AES", handler.queryType()) ? aesKey: "";
if (streq(type, "LZ4HC"))
{
testCompressor(handler, "hclevel=3", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=4", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=5", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=6", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=8", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=10", rowSz, src.length(), src.bytes(), RowCompress);
}
testCompressor(handler, options, rowSz, src.length(), src.bytes(), RowCompress);
if (streq(type, "LZ4"))
{
testCompressor(handler, "allrow", rowSz, src.length(), src.bytes(), AllRowCompress); // block doesn't affect the compressor, just tracing
testCompressor(handler, "block", rowSz, src.length(), src.bytes(), BlockCompress); // block doesn't affect the compressor, just tracing
}
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
}
}

unsigned transferTimeMs(__int64 size, __int64 bytesPerSecond)
{
return (unsigned)((size * 1000) / bytesPerSecond);
}

void testCompressor(ICompressHandler &handler, const char * options, size32_t rowSz, size32_t srcLen, const byte * src, CompressOpt opt)
{
Owned<ICompressor> compressor = handler.getCompressor(options);

CCycleTimer timer;
MemoryBuffer compressed;
CCycleTimer timer;
const byte * ptr = src;
switch (opt)
{
case RowCompress:
{
compressor->open(compressed, sz);
compressor->startblock();
const byte *ptr = src.bytes();
const byte *ptrEnd = ptr + src.length();
const byte *ptrEnd = ptr + srcLen;
while (ptr != ptrEnd)
{
compressor->write(ptr, rowSz);
ptr += rowSz;
}
compressor->commitblock();
compressor->close();
cycle_t compressCycles = timer.elapsedCycles();

Owned<IExpander> expander = handler.getExpander(streq("AES", handler.queryType()) ? aesKey: nullptr);

timer.reset();
size32_t required = expander->init(compressed.bytes());
MemoryBuffer tgt(required);
expander->expand(tgt.bufferBase());
tgt.setWritePos(required);
cycle_t decompressCycles = timer.elapsedCycles();

float ratio = (float)(src.length()) / compressed.length();

DBGLOG("%9s || %21u || %23u || %17.2f [ %u, %u ]", handler.queryType(), (unsigned)cycle_to_millisec(compressCycles), (unsigned)cycle_to_millisec(decompressCycles), ratio, src.length(), compressed.length());

CPPUNIT_ASSERT(tgt.length() >= sz);
CPPUNIT_ASSERT(0 == memcmp(src.bufferBase(), tgt.bufferBase(), sz));
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
break;
}
case AllRowCompress:
{
compressor->open(compressed, sz);
compressor->startblock();
compressor->write(ptr, sz);
compressor->commitblock();
compressor->close();
break;
}
case BlockCompress:
{
void * target = compressed.reserve(sz);
unsigned written = compressor->compressBlock(sz, target, srcLen, ptr);
compressed.setLength(written);
break;
}
}

cycle_t compressCycles = timer.elapsedCycles();
Owned<IExpander> expander = handler.getExpander(options);

timer.reset();
size32_t required = expander->init(compressed.bytes());
MemoryBuffer tgt(required);
expander->expand(tgt.bufferBase());
tgt.setWritePos(required);
cycle_t decompressCycles = timer.elapsedCycles();

float ratio = (float)(srcLen) / compressed.length();

StringBuffer name(handler.queryType());
if (options)
name.append("(").append(options).append(")");

if (name.length() > 19)
name.setLength(19);

unsigned compressTime = (unsigned)cycle_to_millisec(compressCycles);
unsigned decompressTime = (unsigned)cycle_to_millisec(decompressCycles);
unsigned compressedTime = compressTime + decompressTime;
unsigned copyTime200MBs = transferTimeMs(compressed.length(), 200000000);
unsigned copyTime1GBs = transferTimeMs(compressed.length(), 1000000000);
unsigned copyTime5GBs = transferTimeMs(compressed.length(), 5000000000);
unsigned time200MBs = copyTime200MBs + compressedTime;
unsigned time1GBs = copyTime1GBs + compressedTime;
unsigned time5GBs = copyTime5GBs + compressedTime;
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", name.str(), compressTime, decompressTime,
time200MBs, copyTime200MBs + compressTime, copyTime200MBs + decompressTime,
time1GBs, copyTime1GBs + compressTime, copyTime1GBs + decompressTime,
time5GBs, copyTime5GBs + compressTime, copyTime5GBs + decompressTime,
ratio, compressed.length());

CPPUNIT_ASSERT(tgt.length() >= sz);
CPPUNIT_ASSERT(0 == memcmp(src, tgt.bufferBase(), sz));
}
};

Expand Down

0 comments on commit 7289be6

Please sign in to comment.