From 7289be62c5e4d6f3d0c732ee3cc264dd391e0428 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Wed, 1 Nov 2023 15:39:54 +0000 Subject: [PATCH] HPCC-30731 Explore effect of compression on transfer times Signed-off-by: Gavin Halliday --- testing/unittests/jlibtests.cpp | 139 ++++++++++++++++++++++++-------- 1 file changed, 107 insertions(+), 32 deletions(-) diff --git a/testing/unittests/jlibtests.cpp b/testing/unittests/jlibtests.cpp index 0f3a99b29fe..28848554bd3 100644 --- a/testing/unittests/jlibtests.cpp +++ b/testing/unittests/jlibtests.cpp @@ -3088,15 +3088,15 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture CPPUNIT_TEST(test); CPPUNIT_TEST_SUITE_END(); + static constexpr size32_t sz = 100*0x100000; // 100MB + enum CompressOpt { RowCompress, AllRowCompress, BlockCompress }; public: void test() { try { - size32_t sz = 100*0x100000; // 100MB MemoryBuffer src; src.ensureCapacity(sz); - MemoryBuffer compressed; const char *aesKey = "012345678901234567890123"; Owned iter = getCompressHandlerIterator(); @@ -3126,22 +3126,65 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture } } - DBGLOG("Algorithm || Compression Time (ms) || Decompression Time (ms) || Compression Ratio"); + DBGLOG("Algorithm(options) || Comp(ms) || Deco(ms) || 200MB/s (w,r) || 1GB/s (w,r) || 5GB/s (w,r) || Ratio [cLen]"); + DBGLOG(" || || || 2Gb/s || 10Gb/s || 50Gb/s ||"); + unsigned time200MBs = transferTimeMs(sz, 200000000); + unsigned time1GBs = transferTimeMs(sz, 1000000000); + unsigned time5GBs = transferTimeMs(sz, 5000000000); + DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", "uncompressed", 0, 0, + time200MBs, time200MBs, time200MBs, time1GBs, time1GBs, time1GBs, time5GBs, time5GBs, time5GBs, 1.0, sz); ForEach(*iter) { - compressed.clear(); ICompressHandler &handler = iter->query(); + const char * type = handler.queryType(); //Ignore unusual compressors with no expanders... - if (strieq(handler.queryType(), "randrow")) + if (strieq(type, "randrow")) continue; - Owned compressor = handler.getCompressor(streq("AES", handler.queryType()) ? aesKey: nullptr); + const char * options = streq("AES", handler.queryType()) ? aesKey: ""; + if (streq(type, "LZ4HC")) + { + testCompressor(handler, "hclevel=3", rowSz, src.length(), src.bytes(), RowCompress); + testCompressor(handler, "hclevel=4", rowSz, src.length(), src.bytes(), RowCompress); + testCompressor(handler, "hclevel=5", rowSz, src.length(), src.bytes(), RowCompress); + testCompressor(handler, "hclevel=6", rowSz, src.length(), src.bytes(), RowCompress); + testCompressor(handler, "hclevel=8", rowSz, src.length(), src.bytes(), RowCompress); + testCompressor(handler, "hclevel=10", rowSz, src.length(), src.bytes(), RowCompress); + } + testCompressor(handler, options, rowSz, src.length(), src.bytes(), RowCompress); + if (streq(type, "LZ4")) + { + testCompressor(handler, "allrow", rowSz, src.length(), src.bytes(), AllRowCompress); // block doesn't affect the compressor, just tracing + testCompressor(handler, "block", rowSz, src.length(), src.bytes(), BlockCompress); // block doesn't affect the compressor, just tracing + } + } + } + catch (IException *e) + { + EXCLOG(e, nullptr); + throw; + } + } + + unsigned transferTimeMs(__int64 size, __int64 bytesPerSecond) + { + return (unsigned)((size * 1000) / bytesPerSecond); + } + + void testCompressor(ICompressHandler &handler, const char * options, size32_t rowSz, size32_t srcLen, const byte * src, CompressOpt opt) + { + Owned compressor = handler.getCompressor(options); - CCycleTimer timer; + MemoryBuffer compressed; + CCycleTimer timer; + const byte * ptr = src; + switch (opt) + { + case RowCompress: + { compressor->open(compressed, sz); compressor->startblock(); - const byte *ptr = src.bytes(); - const byte *ptrEnd = ptr + src.length(); + const byte *ptrEnd = ptr + srcLen; while (ptr != ptrEnd) { compressor->write(ptr, rowSz); @@ -3149,30 +3192,62 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture } compressor->commitblock(); compressor->close(); - cycle_t compressCycles = timer.elapsedCycles(); - - Owned expander = handler.getExpander(streq("AES", handler.queryType()) ? aesKey: nullptr); - - timer.reset(); - size32_t required = expander->init(compressed.bytes()); - MemoryBuffer tgt(required); - expander->expand(tgt.bufferBase()); - tgt.setWritePos(required); - cycle_t decompressCycles = timer.elapsedCycles(); - - float ratio = (float)(src.length()) / compressed.length(); - - DBGLOG("%9s || %21u || %23u || %17.2f [ %u, %u ]", handler.queryType(), (unsigned)cycle_to_millisec(compressCycles), (unsigned)cycle_to_millisec(decompressCycles), ratio, src.length(), compressed.length()); - - CPPUNIT_ASSERT(tgt.length() >= sz); - CPPUNIT_ASSERT(0 == memcmp(src.bufferBase(), tgt.bufferBase(), sz)); - } - } - catch (IException *e) - { - EXCLOG(e, nullptr); - throw; + break; + } + case AllRowCompress: + { + compressor->open(compressed, sz); + compressor->startblock(); + compressor->write(ptr, sz); + compressor->commitblock(); + compressor->close(); + break; + } + case BlockCompress: + { + void * target = compressed.reserve(sz); + unsigned written = compressor->compressBlock(sz, target, srcLen, ptr); + compressed.setLength(written); + break; + } } + + cycle_t compressCycles = timer.elapsedCycles(); + Owned expander = handler.getExpander(options); + + timer.reset(); + size32_t required = expander->init(compressed.bytes()); + MemoryBuffer tgt(required); + expander->expand(tgt.bufferBase()); + tgt.setWritePos(required); + cycle_t decompressCycles = timer.elapsedCycles(); + + float ratio = (float)(srcLen) / compressed.length(); + + StringBuffer name(handler.queryType()); + if (options) + name.append("(").append(options).append(")"); + + if (name.length() > 19) + name.setLength(19); + + unsigned compressTime = (unsigned)cycle_to_millisec(compressCycles); + unsigned decompressTime = (unsigned)cycle_to_millisec(decompressCycles); + unsigned compressedTime = compressTime + decompressTime; + unsigned copyTime200MBs = transferTimeMs(compressed.length(), 200000000); + unsigned copyTime1GBs = transferTimeMs(compressed.length(), 1000000000); + unsigned copyTime5GBs = transferTimeMs(compressed.length(), 5000000000); + unsigned time200MBs = copyTime200MBs + compressedTime; + unsigned time1GBs = copyTime1GBs + compressedTime; + unsigned time5GBs = copyTime5GBs + compressedTime; + DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", name.str(), compressTime, decompressTime, + time200MBs, copyTime200MBs + compressTime, copyTime200MBs + decompressTime, + time1GBs, copyTime1GBs + compressTime, copyTime1GBs + decompressTime, + time5GBs, copyTime5GBs + compressTime, copyTime5GBs + decompressTime, + ratio, compressed.length()); + + CPPUNIT_ASSERT(tgt.length() >= sz); + CPPUNIT_ASSERT(0 == memcmp(src, tgt.bufferBase(), sz)); } };