Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32017 New compressing splitter implementation #18738

Merged

Conversation

jakesmith
Copy link
Member

@jakesmith jakesmith commented Jun 7, 2024

Type of change:

  • This change is a bug fix (non-breaking change which fixes an issue).
  • This change is a new feature (non-breaking change which adds functionality).
  • This change improves the code (refactor or other change that does not change the functionality)
  • This change fixes warnings (the fix does not alter the functionality or the generated code)
  • This change is a breaking change (fix or feature that will cause existing behavior to change).
  • This change alters the query API (existing queries will have to be recompiled)

Checklist:

  • My code follows the code style of this project.
    • My code does not create any new warnings from compiler, build system, or lint.
  • The commit message is properly formatted and free of typos.
    • The commit message title makes sense in a changelog, by itself.
    • The commit is signed.
  • My change requires a change to the documentation.
    • I have updated the documentation accordingly, or...
    • I have created a JIRA ticket to update the documentation.
    • Any new interfaces or exported functions are appropriately commented.
  • I have read the CONTRIBUTORS document.
  • The change has been fully tested:
    • I have added tests to cover my changes.
    • All new and existing tests passed.
    • I have checked that this change does not introduce memory leaks.
    • I have used Valgrind or similar tools to check for potential issues.
  • I have given due consideration to all of the following potential concerns:
    • Scalability
    • Performance
    • Security
    • Thread-safety
    • Cloud-compatibility
    • Premature optimization
    • Existing deployed queries will not be broken
    • This change fixes the problem, not just the symptom
    • The target branch of this pull request is appropriate for such a change.
  • There are no similar instances of the same problem that should be addressed
    • I have addressed them here
    • I have raised JIRA issues to address them separately
  • This is a user interface / front-end modification
    • I have tested my changes in multiple modern browsers
    • The component(s) render as expected

Smoketest:

  • Send notifications about my Pull Request position in Smoketest queue.
  • Test my draft Pull Request.

Testing:

Copy link

github-actions bot commented Jun 7, 2024

Jira Issue: https://hpccsystems.atlassian.net//browse/HPCC-32017

Jirabot Action Result:
Workflow Transition: Merge Pending
Updated PR
Assigning user: [email protected]

@jakesmith jakesmith force-pushed the HPCC-32017-newsplitter branch 3 times, most recently from 65d377e to 13194ad Compare June 7, 2024 10:54
@AttilaVamos
Copy link
Contributor

There are a couple of cores generated in Regression Setup:

[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Core was generated by `./thorslave_mythor --master=10.22.254.225:20000 --slave=.:20120 --slavenum=2 --'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0  0x00007fd7f87331e5 in CFileSerialInputStream::read(unsigned int, void*) () from /opt/HPCCSystems/lib/libjlib.so
[Current thread is 1 (Thread 0x7fd6dedfc700 (LWP 8891))]
...
Thread 1 (Thread 0x7fd6dedfc700 (LWP 8891)):
#0  0x00007fd7f87331e5 in CFileSerialInputStream::read(unsigned int, void*) () from /opt/HPCCSystems/lib/libjlib.so
#1  0x00007fd7f87350ca in CDecompressingSerialInputStream::read(unsigned int, void*) () from /opt/HPCCSystems/lib/libjlib.so
#2  0x00007fd7f8734840 in CBlockedSerialInputStream::get(unsigned int, void*) () from /opt/HPCCSystems/lib/libjlib.so
#3  0x00007fd7faf24a2f in CThorStreamDeserializerSource::read(unsigned int, void*) () from /opt/HPCCSystems/lib/libeclrtl.so
#4  0x00007fd7f0189acc in ?? () from /var/lib/HPCCSystems/queries/mythor_20120/V904511781_libW20240607-112339-5.so
#5  0x00007fd7fdcbcf07 in CSharedFullSpillingWriteAhead::COutputRowStream::getRowFromStream() () from /opt/HPCCSystems/lib/libgraph_lcr.so
#6  0x00007fd7fdac8729 in CSplitterOutput::nextRow() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#7  0x00007fd7fdacbfa8 in non-virtual thunk to CParseSlaveActivity::nextRow() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#8  0x00007fd7f872091d in CRowStreamMerger::pullInput(unsigned int) () from /opt/HPCCSystems/lib/libjlib.so
#9  0x00007fd7f871cb06 in createRowStreamMerger(unsigned int, IRowStream**, ICompare*, bool, IRowLinkCounter*) () from /opt/HPCCSystems/lib/libjlib.so
#10 0x00007fd7fdab5fbb in GlobalMergeSlaveActivity::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#11 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#12 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#13 0x00007fd7fdac9c04 in NSplitterSlaveActivity::prepareInput() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#14 0x00007fd7fdac85dc in CSplitterOutput::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#15 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#16 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#17 0x00007fd7fda2705f in GroupSlaveActivity::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#18 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#19 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#20 0x00007fd7fd9f131d in non-virtual thunk to GroupAggregateSlaveActivity::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#21 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#22 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#23 0x00007fd7fdac9c04 in NSplitterSlaveActivity::prepareInput() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#24 0x00007fd7fdac85dc in CSplitterOutput::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#25 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#26 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#27 0x00007fd7fdac8279 in non-virtual thunk to NormalizeSlaveActivity::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#28 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#29 0x00007fd7fdab5ef5 in GlobalMergeSlaveActivity::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#30 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#31 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#32 0x00007fd7fdab978c in MSortSlaveActivity::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#33 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#34 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#35 0x00007fd7fda2705f in GroupSlaveActivity::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#36 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#37 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#38 0x00007fd7fda50c47 in non-virtual thunk to GroupIterateSlaveActivity::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#39 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#40 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#41 0x00007fd7fd9ffae2 in non-virtual thunk to CDegroupSlaveActivity::start() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#42 0x00007fd7fdd710b7 in CSlaveActivity::startInput(unsigned int, char const*) () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#43 0x00007fd7fdd711b3 in CSlaveActivity::start() () from /opt/HPCCSystems/lib/libgraphslave_lcr.so
#44 0x00007fd7fdaeb05a in CDiskWriteSlaveActivityBase::open() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#45 0x00007fd7fdaec1dd in CDiskWriteSlaveActivityBase::process() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#46 0x00007fd7fd9ebb62 in ProcessSlaveActivity::threadmain() () from /opt/HPCCSystems/lib/libactivityslaves_lcr.so
#47 0x00007fd7f873ec4c in CThreadedPersistent::threadmain() () from /opt/HPCCSystems/lib/libjlib.so
#48 0x00007fd7f8743fc0 in non-virtual thunk to CThreadedPersistent::CAThread::run() () from /opt/HPCCSystems/lib/libjlib.so
#49 0x00007fd7f873fb21 in Thread::begin() () from /opt/HPCCSystems/lib/libjlib.so
#50 0x00007fd7f873eb99 in Thread::_threadmain(void*) () from /opt/HPCCSystems/lib/libjlib.so
#51 0x00007fd7f659bea5 in start_thread () from /lib64/libpthread.so.0
#52 0x00007fd7f5830b0d in clone () from /lib64/libc.so.6

@jakesmith jakesmith force-pushed the HPCC-32017-newsplitter branch from 13194ad to 6a7b96f Compare June 7, 2024 12:55
{
Owned<IFileIO> iFileIO = iFile->open(IFOread);
Owned<ISerialInputStream> in = createSerialInputStream(iFileIO);
Owned<IBufferedSerialInputStream> inputStream = createBufferedInputStream(in, readAheadSize, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be the preferred block size from the storage plane

const char *options = nullptr;
Owned<ICompressor> compressor = compressHandler->getCompressor(options);
Owned<ISerialOutputStream> compressed = createCompressingOutputStream(outputStream, compressor);
constexpr size32_t compressedSize = 0x100000; // JCSMORE configurable/what is ideal size?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compressionBlockSize - should pass the same constant to the decompressor

ActPrintLog(&activity, "Spilling to temp storage [file = %s]", iFile->queryFilename());
Owned<IFileIO> io = iFile->open(IFOcreate);
Owned<ISerialOutputStream> out = createSerialOutputStream(io);
outputStream.setown(createBufferedOutputStream(out, writeAheadSize));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be from the plane the preferred block size for the temp plane

}
bool checkWriteAhead(rowcount_t &outputRowsAvailable)
{
CriticalBlock b(crit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to code as:

        if (totalRowsWritten == outputRowsAvailable)
{
        CriticalBlock b(crit);
        if (totalRowsWritten == outputRowsAvailable)
        {

with atomic variables for totalRowsWritten

thorlcr/thorutil/thbuf.cpp Show resolved Hide resolved
}
bool getRowsInMem(Rows &outputRows, rowcount_t &outputRowsAvailable)
{
CriticalBlock b(crit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also block other threads. Avoiding that will be a bit tricky - but a circular buffer would make it easier.
Reallocation also makes it tricky

size32_t sz = thorRowMemoryFootprint(serializer, row);
rows.emplace_back(row, sz);
rowsMemUsage += sz;
if (rowsMemUsage >= maxRowMem)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to use a different condition here. I.e. read in 256K chunks until you get to 1MB

Maybe 64 rows or 128K whichever is hit first

@jakesmith jakesmith force-pushed the HPCC-32017-newsplitter branch 3 times, most recently from 1c31c3b to be4d061 Compare June 11, 2024 08:14
@jakesmith jakesmith requested a review from ghalliday June 11, 2024 08:14
@jakesmith
Copy link
Member Author

@ghalliday - I think ready for full review (though I've left a 'test' commit in for now).

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakesmith some initial comments. I will reread again

}
virtual void endNested(size32_t sizePos) override
{
size32_t nestedSize = nestedSizes.back();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic will not work for datasets nested within datasets. I need to add a tell() function to the output (that was coming in my next iteration anyway). You then need something along the lines of

size32_t beginNested(count)
{
   output->suspend(sizeof(size32_t));
   if (nesting++ == 0)
      outerNestingOffset = output->tell();
   return output->tell()-startNesting;
}

void endNested(size32_t delta)
{
   size32_t patchedLength = output->tell() + delta - outerNestingOffset;
   outputStream->resume(sizeof(size32_t), &patchedLength);
   nesting--;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let me know when you push your update.

thorlcr/thorutil/thbuf.cpp Show resolved Hide resolved
return rowBuilder.finalizeRowClear(sz);
}
public:
rowcount_t lastKnownWritten = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing that it is not with the other member variables. Should probably be private with an accessor function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. Have added get/set methods.

memsize_t rowsMemUsage = 0;
std::atomic<rowcount_t> totalInputRowsRead = 0; // not used until spilling begins, represents count of all rows read
rowcount_t inMemTotalRows = 0; // whilst in memory, represents count of all rows seen
CriticalSection crit;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this protecting? Worth giving it a more descriptive name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have renamed/added comment.

thorlcr/thorutil/thbuf.cpp Show resolved Hide resolved
}
void createOutputStream()
{
// NB: this flushes existing unread rows from memory. Called once, when spilling starts.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it does anything with unread rows. It affects what happens next to rows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stale comment, from when it did. Changed.

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakesmith some initial comments. I will reread again

}
}
}
virtual void stop() override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop() should indicate it is not going to read any more rows (e.g., set lastKnownWritten to -1), otherwise an output that only reads 100 records and then stops will cause another output to spill.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does, via call to owner.outputStopped. It does it that way because needs to do so within readAheadCS.
Have added comment

return rowBuilder.finalizeRowClear(sz);
}
public:
rowcount_t lastKnownWritten = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastKnownAvailable would be a better name to cover both in-memory and spilled rows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, renamed.

@jakesmith jakesmith force-pushed the HPCC-32017-newsplitter branch 4 times, most recently from aebec19 to 2f0827d Compare June 13, 2024 10:26
eof = true;
}
};
class COutputStreamSerializer : public CSimpleInterfaceOf<IRowSerializerTarget>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to same place as CThorStreamDeserializerSource - so it can be reused in the unit tests?

@jakesmith jakesmith force-pushed the HPCC-32017-newsplitter branch from c8ac137 to 28cfe90 Compare June 14, 2024 10:18
struct SharedRowStreamReaderOptions
{
offset_t storageBlockSize = 256 * 1024; // block size of read/write streams
memsize_t compressionBlockSize = 256 * 1024; // compression buffer size of read streams
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this should be 1024 as it is elsewhere.

struct SharedRowStreamReaderOptions
{
offset_t storageBlockSize = 256 * 1024; // block size of read/write streams
memsize_t compressionBlockSize = 256 * 1024; // compression buffer size of read streams
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this should be 1024 as it is elsewhere.

memsize_t inMemMaxMem = 2000 * 1024; // before spilling begins.
memsize_t inMemReadAheadGranularity = 128 * 1024; // granularity (K) of read ahead
rowcount_t inMemReadAheadGranularityRows = 64; // granularity (rows) of read ahead. NB: whichever granularity is hit first
offset_t spillWriteAheadSize = 4000 * 1024; // once spilling, maximum size to write ahead
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be this high (as long as it is slightly less than a multiple of compressionBlockSize), but I'm not sure what effect varying it will have. Might be worth experimenting.

@jakesmith jakesmith force-pushed the HPCC-32017-newsplitter branch from 0db080a to 2116ab4 Compare June 14, 2024 15:14
@ghalliday ghalliday merged commit fcb0440 into hpcc-systems:candidate-9.6.x Jun 14, 2024
20 of 21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants