From 4f2cb5701625b3bf795c7a486b5f21f9e10d4258 Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 1 Nov 2023 14:55:17 +0000 Subject: [PATCH] Use LZ4HC rather than LZW to compress Roxie continuation data Signed-off-by: Richard Chapman --- roxie/ccd/ccdactivities.cpp | 13 ++++++---- system/jlib/jlzw.cpp | 37 +++++++++++++++++++++++++---- system/jlib/jlzw.hpp | 2 ++ testing/unittests/unittests.cpp | 42 +++++++++++++++++++++++++++++++++ 4 files changed, 86 insertions(+), 8 deletions(-) diff --git a/roxie/ccd/ccdactivities.cpp b/roxie/ccd/ccdactivities.cpp index 3d0f364e604..7515d6a7ab9 100644 --- a/roxie/ccd/ccdactivities.cpp +++ b/roxie/ccd/ccdactivities.cpp @@ -44,6 +44,7 @@ #include "thorcommon.ipp" #include "thorstrand.hpp" #include "jstats.h" +#include "jlz4.hpp" using roxiemem::OwnedRoxieRow; using roxiemem::OwnedConstRoxieRow; @@ -2707,7 +2708,8 @@ class CRoxieIndexActivity : public CRoxieKeyedActivity compressed.append(siLen); // Leaving space to patch when size known compressed.append(true); compressed.append(lastRowCompleteMatch); // This field is not compressed - see above! - compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length()); + Owned compressor = createLZ4Compressor("hclevel=3", true); + compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length(), compressor); bool report = logctx.queryTraceLevel() && (doTrace(traceRoxiePackets) || si.length() >= continuationWarnThreshold); if (report) logctx.CTXLOG("ERROR: continuation data size %u for %d cursor positions is large - compressed to %u", si.length(), tlk->numActiveKeys(), compressed.length()); @@ -2733,7 +2735,8 @@ class CRoxieIndexActivity : public CRoxieKeyedActivity if (isCompressed) { MemoryBuffer decompressed; - decompressToBuffer(decompressed, resentInfo); + Owned expander = createLZ4Expander(); + decompressToBuffer(decompressed, resentInfo, expander); if (doTrace(traceRoxiePackets)) logctx.CTXLOG("readContinuationInfo: decompressed from %u to %u", resentInfo.length(), decompressed.length()); resentInfo.swapWith(decompressed); @@ -4120,7 +4123,8 @@ class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity MemoryBuffer compressed; compressed.append(siLen); // Leaving space to patch when size known compressed.append(true); - compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length()); + Owned compressor = createLZ4Compressor("hclevel=3", true); + compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length(), compressor); bool report = logctx.queryTraceLevel() && (doTrace(traceRoxiePackets) || si.length() >= continuationWarnThreshold); if (report) DBGLOG("ERROR: continuation data size %u for %d cursor positions is large - compressed to %u", si.length(), tlk->numActiveKeys(), compressed.length()); @@ -4145,7 +4149,8 @@ class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity if (isCompressed) { MemoryBuffer decompressed; - decompressToBuffer(decompressed, resentInfo); + Owned expander = createLZ4Expander(); + decompressToBuffer(decompressed, resentInfo, expander); if (doTrace(traceRoxiePackets)) DBGLOG("readContinuationInfo: decompressed from %u to %u", resentInfo.length(), decompressed.length()); resentInfo.swapWith(decompressed); diff --git a/system/jlib/jlzw.cpp b/system/jlib/jlzw.cpp index 2dbf48aca62..57620044110 100644 --- a/system/jlib/jlzw.cpp +++ b/system/jlib/jlzw.cpp @@ -765,7 +765,7 @@ void appendToBuffer(MemoryBuffer & out, size32_t len, const void * src) out.append(len, src); } -void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src) +void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src, ICompressor *compressor) { unsigned originalLength = out.length(); out.append(true); @@ -774,7 +774,6 @@ void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src) if (len >= 32) { size32_t newSize = len * 4 / 5; // Copy if compresses less than 80% ... - Owned compressor = createLZWCompressor(); void *newData = out.reserve(newSize); compressor->open(newData, newSize); if (compressor->write(src, len)==len) @@ -793,15 +792,34 @@ void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src) appendToBuffer(out, len, src); } -void decompressToBuffer(MemoryBuffer & out, const void * src) +void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src) +{ + if (len < 32) + { + out.append(false); + appendToBuffer(out, len, src); + } + else + { + Owned compressor = createLZWCompressor(); + compressToBuffer(out, len, src, compressor); + } +} + +void decompressToBuffer(MemoryBuffer & out, const void * src, IExpander *expander) { - Owned expander = createLZWExpander(); unsigned outSize = expander->init(src); void * buff = out.reserve(outSize); expander->expand(buff); } +void decompressToBuffer(MemoryBuffer & out, const void * src) +{ + Owned expander = createLZWExpander(); + decompressToBuffer(out, src, expander); +} + void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in) { bool compressed; @@ -813,6 +831,17 @@ void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in) out.append(srcLen, in.readDirect(srcLen)); } +void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in, IExpander *expander) +{ + bool compressed; + size32_t srcLen; + in.read(compressed).read(srcLen); + if (compressed) + decompressToBuffer(out, in.readDirect(srcLen), expander); + else + out.append(srcLen, in.readDirect(srcLen)); +} + void decompressToAttr(MemoryAttr & out, const void * src) { Owned expander = createLZWExpander(); diff --git a/system/jlib/jlzw.hpp b/system/jlib/jlzw.hpp index 1d7c7196390..48d9cdfd3b4 100644 --- a/system/jlib/jlzw.hpp +++ b/system/jlib/jlzw.hpp @@ -118,8 +118,10 @@ extern jlib_decl IRandRowExpander *createRandRDiffExpander(); // NB only support //Some helper functions to make it easy to compress/decompress to memorybuffers. +extern jlib_decl void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src, ICompressor *compressor); extern jlib_decl void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src); extern jlib_decl void decompressToBuffer(MemoryBuffer & out, const void * src); +extern jlib_decl void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in, IExpander *expander); extern jlib_decl void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in); extern jlib_decl void decompressToAttr(MemoryAttr & out, const void * src); extern jlib_decl void decompressToBuffer(MemoryAttr & out, MemoryBuffer & in); diff --git a/testing/unittests/unittests.cpp b/testing/unittests/unittests.cpp index a4c7e7e8ca6..a8b0545acec 100644 --- a/testing/unittests/unittests.cpp +++ b/testing/unittests/unittests.cpp @@ -1052,5 +1052,47 @@ class RelaxedAtomicTimingTest : public CppUnit::TestFixture CPPUNIT_TEST_SUITE_REGISTRATION( RelaxedAtomicTimingTest ); CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RelaxedAtomicTimingTest, "RelaxedAtomicTimingTest" ); +#include "jlzw.hpp" +class compressToBufferTest : public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE( compressToBufferTest ); + CPPUNIT_TEST(testRun); + CPPUNIT_TEST_SUITE_END(); + + void testRun() + { + MemoryBuffer x; + compressToBuffer(x, 251, + "HelloHelloHelloHelloHelloHelloHelloHelloHelloHello" + "HelloHelloHelloHelloHelloHelloHelloHelloHelloHello" + "HelloHelloHelloHelloHelloHelloHelloHelloHelloHello" + "HelloHelloHelloHelloHelloHelloHelloHelloHelloHello" + "HelloHelloHelloHelloHelloHelloHelloHelloHelloHello" + ); + for (unsigned i = 0; i < x.length(); i++) + printf("%02x ", x.toByteArray()[i]); + printf("\n"); + * (byte *) x.toByteArray() = 2; + for (unsigned i = 0; i < x.length(); i++) + printf("%02x ", x.toByteArray()[i]); + printf("\n"); + try + { + MemoryBuffer out; + decompressToBuffer(out, x); + printf("%s\n", out.toByteArray()); + } + catch(IException *E) + { + StringBuffer s; + printf("Exception %s\n", E->errorMessage(s).str()); + ::Release(E); + } + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( compressToBufferTest ); +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( compressToBufferTest, "CompressToBufferTest" ); + #endif // _USE_CPPUNIT