diff --git a/devdoc/roxie.md b/devdoc/roxie.md index cfcbadf92b9..60a808be4c8 100644 --- a/devdoc/roxie.md +++ b/devdoc/roxie.md @@ -281,3 +281,17 @@ Should the scope of the blacklist be different? Possible scopes are: Options 2 and 4 above would allow all aspects of the blacklisting behaviour to be specified by options on the SOAPCALL. We could control whether or not the blacklister is to be used at all via a SOAPCALL option with any of the above... + +Some notes on LocalAgent mode +============================= + +In localAgent mode, the global queueManager object (normally a RoxieUdpSocketQueueManager) is replaced by a RoxieLocalQueueManager. Outbound packets are added directly to target queue, inbound are packed into DataBuffers. + +There is also "local optimizations" mode where any index operation reading a one-part file (does the same apply to one-part disk files?) just reads it directly on the server (regardless of localAgent setting). Typically still injected into receiver code though as otherwise handling exception cases, limits etc would all be duplicated/messy. Rows created in localOptimization mode are created directly in the caller's row manager, and are injected in serialized format. + +Why are inbound not created directly in the desired destination's allocator and then marked as serialized? Some lifespan issues... are they insurmountable? +We do pack into dataBuffers rather than MemoryBuffers, which avoids a need to copy the data before the receiver can use it. Large rows get split and will require copying again, but we could set dataBufferSize to be bigger in localAgent mode to mitigate this somewhat. + +What is the lifespan issue? In-flight queries may be abandoned when a server-side query fails, times out, or no longer needs the data. Using DataBuffer does not have this issue as they are attached to the query's memory manager/allocation once read. Or we could bypass the agent queue altogether, but rather more refactoring needed for that (might almost be easier to extent the "local optimization" mode to use multiple threads at that point) + +abortPending, replyPending, and abortPendingData methods are unimplemented, which may lead to some inefficiencies? diff --git a/helm/hpcc/values.schema.json b/helm/hpcc/values.schema.json index 7db1fdd4c16..a193b858a68 100644 --- a/helm/hpcc/values.schema.json +++ b/helm/hpcc/values.schema.json @@ -1585,6 +1585,11 @@ "default": 0, "description": "Specify an IONICE value for the background copy thread, if backgroundCopyClass set to best-effort." }, + "blockedLocalAgent": { + "type": "boolean", + "default": true, + "description": "Used DataBuffer blocks to return agent data in localAgent mode." + }, "callbackRetries": { "type": "integer", "default": 3, diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index f70b654b18a..a3acd99016a 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -300,6 +300,7 @@ extern IPropertyTree *topology; extern MapStringTo *preferredClusters; extern StringArray allQuerySetNames; +extern bool blockedLocalAgent; extern bool acknowledgeAllRequests; extern bool alwaysTrustFormatCrcs; extern bool allFilesDynamic; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index c028d6dfada..7cd596372a0 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -68,6 +68,7 @@ unsigned numServerThreads = 30; unsigned numAgentThreads = 30; bool prestartAgentThreads = false; unsigned numRequestArrayThreads = 5; +bool blockedLocalAgent = true; bool acknowledgeAllRequests = true; unsigned headRegionSize; unsigned ccdMulticastPort; @@ -756,8 +757,19 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) else setStatisticsComponentName(SCTroxie, "roxie", true); #ifdef _CONTAINERIZED - getDefaultStoragePlane(defaultPlane); - getDefaultIndexBuildStoragePlane(defaultIndexBuildPlane); + try + { + getDefaultStoragePlane(defaultPlane); + getDefaultIndexBuildStoragePlane(defaultIndexBuildPlane); + } + catch (IException *E) + { +#ifdef _DEBUG + E->Release(); // Useful for some local testing to be able to ignore these configuration errors +#else + throw; +#endif + } #endif installDefaultFileHooks(topology); @@ -966,6 +978,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) } minPayloadSize = topology->getPropInt("@minPayloadSize", minPayloadSize); + blockedLocalAgent = topology->getPropBool("@blockedLocalAgent", blockedLocalAgent); acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests); headRegionSize = topology->getPropInt("@headRegionSize", 0); ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 3c19f614c28..cc99b474ac3 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -3010,7 +3010,8 @@ unsigned CDummyMessagePacker::size() const interface ILocalMessageCollator : extends IMessageCollator { - virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen) = 0; + virtual bool attachDataBuffers(const ArrayOf &buffers) = 0; + virtual void enqueueMessage(bool outOfBand, unsigned totalSize, IMessageResult *result) = 0; }; interface ILocalReceiveManager : extends IReceiveManager @@ -3018,10 +3019,11 @@ interface ILocalReceiveManager : extends IReceiveManager virtual ILocalMessageCollator *lookupCollator(ruid_t id) = 0; }; - - -class LocalMessagePacker : public CDummyMessagePacker +class LocalMessagePacker : public CInterfaceOf { +protected: + unsigned lastput = 0; + MemoryBuffer data; MemoryBuffer meta; MemoryBuffer header; Linked rm; @@ -3029,13 +3031,173 @@ class LocalMessagePacker : public CDummyMessagePacker bool outOfBand; public: - IMPLEMENT_IINTERFACE; LocalMessagePacker(RoxiePacketHeader &_header, bool _outOfBand, ILocalReceiveManager *_rm) : rm(_rm), outOfBand(_outOfBand) { id = _header.uid; header.append(sizeof(RoxiePacketHeader), &_header); } + void * getBuffer(unsigned len, bool variable) override + { + if (variable) + { + char *ret = (char *) data.ensureCapacity(len + sizeof(RecordLengthType)); + return ret + sizeof(RecordLengthType); + } + else + { + return data.ensureCapacity(len); + } + } + + void putBuffer(const void *buf, unsigned len, bool variable) override + { + if (variable) + { + buf = ((char *) buf) - sizeof(RecordLengthType); + *(RecordLengthType *) buf = len; + len += sizeof(RecordLengthType); + } + data.setWritePos(lastput + len); + lastput += len; + } + + unsigned size() const + { + return lastput; + } + + virtual void flush() override; + + virtual void sendMetaInfo(const void *buf, unsigned len) override + { + meta.append(len, buf); + } + +}; + +class LocalBlockedMessagePacker : public CInterfaceOf +{ +protected: + unsigned lastput = 0; + MemoryBuffer meta; + MemoryBuffer header; + Linked rm; + ruid_t id; + bool outOfBand; + const unsigned dataBufferSize = DATA_PAYLOAD - sizeof(unsigned short); + bool packed = false; + unsigned tempBufferSize = 0; // MORE - use a MemoryBuffer? + void *tempBuffer = nullptr; + unsigned dataPos = 0; + unsigned bufferRemaining = 0; + DataBuffer *currentBuffer = nullptr; + ArrayOf buffers; + unsigned totalDataLen = 0; +public: + LocalBlockedMessagePacker(RoxiePacketHeader &_header, bool _outOfBand, ILocalReceiveManager *_rm) : rm(_rm), outOfBand(_outOfBand) + { + id = _header.uid; + header.append(sizeof(RoxiePacketHeader), &_header); + } + ~LocalBlockedMessagePacker() + { + free(tempBuffer); + } + + void * getBuffer(unsigned len, bool variable) override + { + if (variable) + len += sizeof(RecordLengthType); + if (dataBufferSize < len) + { + // Won't fit in one, so allocate temp location + // This code stolen from UDP layer - we could redo using a MemoryBuffer... + packed = false; + if (tempBufferSize < len) + { + free(tempBuffer); + tempBuffer = checked_malloc(len, ROXIE_MEMORY_ERROR); + tempBufferSize = len; + } + if (variable) + return ((char *) tempBuffer) + sizeof(RecordLengthType); + else + return tempBuffer; + } + else + { + // Will fit in one, though not necessarily the current one... + packed = true; + if (currentBuffer && (bufferRemaining < len)) + { + // Note that we never span records that are small enough to fit in one buffer - this can result in significant wastage if record just over DATA_PAYLOAD/2 + *(unsigned short *) ¤tBuffer->data = dataPos - sizeof(unsigned short); + buffers.append(currentBuffer); + currentBuffer = nullptr; + } + if (!currentBuffer) + { + currentBuffer = bufferManager->allocate(); + dataPos = sizeof (unsigned short); + bufferRemaining = dataBufferSize; + } + if (variable) + return ¤tBuffer->data[dataPos + sizeof(RecordLengthType)]; + else + return ¤tBuffer->data[dataPos]; + } + } + + void putBuffer(const void *buf, unsigned len, bool variable) override + { + if (variable) + { + assertex(len < MAX_RECORD_LENGTH); + buf = ((char *) buf) - sizeof(RecordLengthType); + *(RecordLengthType *) buf = len; + len += sizeof(RecordLengthType); + } + totalDataLen += len; + if (packed) + { + assert(len <= bufferRemaining); + dataPos += len; + bufferRemaining -= len; + } + else + { + while (len) + { + if (!currentBuffer) + { + currentBuffer = bufferManager->allocate(); + dataPos = sizeof (unsigned short); + bufferRemaining = dataBufferSize; + } + unsigned chunkLen = bufferRemaining; + if (chunkLen > len) + chunkLen = len; + memcpy(¤tBuffer->data[dataPos], buf, chunkLen); + dataPos += chunkLen; + len -= chunkLen; + buf = &(((char*)buf)[chunkLen]); + bufferRemaining -= chunkLen; + if (len) + { + *(unsigned short *) ¤tBuffer->data = dataPos - sizeof(unsigned short); + buffers.append(currentBuffer); + currentBuffer = nullptr; + } + } + } + } + + unsigned size() const + { + return lastput; + } + virtual void flush() override; virtual void sendMetaInfo(const void *buf, unsigned len) override @@ -3047,6 +3209,8 @@ class LocalMessagePacker : public CDummyMessagePacker class CLocalMessageUnpackCursor : implements IMessageUnpackCursor, public CInterface { + // Note that the data is owned by the CLocalMessageCursor that created me, + // and will be released by it when it dies void *data; unsigned datalen; unsigned pos; @@ -3100,13 +3264,110 @@ class CLocalMessageUnpackCursor : implements IMessageUnpackCursor, public CInter } }; +class CLocalBlockedMessageUnpackCursor : implements IMessageUnpackCursor, public CInterface +{ + Linked rowManager; + const ArrayOf &buffers; // Owned by the CLocalBlockedMessageCursor that created me + const byte *currentBuffer = nullptr; + unsigned currentBufferRemaining = 0; + unsigned bufferIdx = 0; +public: + IMPLEMENT_IINTERFACE; + CLocalBlockedMessageUnpackCursor(IRowManager *_rowManager, const ArrayOf &_buffers) + : rowManager(_rowManager), buffers(_buffers) + { + if (buffers.length()) + { + currentBuffer = (const byte *) buffers.item(0).get()->data; + currentBufferRemaining = *(unsigned short *) currentBuffer; + currentBuffer += sizeof(unsigned short); + } + } + + ~CLocalBlockedMessageUnpackCursor() + { + } + + virtual bool atEOF() const + { + return currentBuffer != nullptr; + } + + virtual bool isSerialized() const + { + // NOTE: tempting to think that we could avoid serializing in localAgent case, but have to be careful about the lifespan of the rowManager... + return true; + } + + virtual const void * getNext(int length) + { + if (!currentBuffer) + return nullptr; + if ((currentBufferRemaining) >= (unsigned) length) + { + // Simple case - no need to copy + const void *res = currentBuffer; + currentBuffer += length; + currentBufferRemaining -= length; + checkNext(); + LinkRoxieRow(res); + return res; + } + char *currResLoc = (char*)rowManager->allocate(length, 0); + const void *res = currResLoc; + while (length && currentBuffer) + { + // Spans more than one block - allocate and copy + unsigned cpyLen = currentBufferRemaining; + if (cpyLen > (unsigned) length) cpyLen = length; + memcpy(currResLoc, currentBuffer, cpyLen); + length -= cpyLen; + currResLoc += cpyLen; + currentBuffer += cpyLen; + currentBufferRemaining -= cpyLen; + checkNext(); + } + assertex(!length); // fail if not enough data available + return res; + } + + virtual RecordLengthType *getNextLength() override + { + if (!currentBuffer) + return nullptr; + assertex (currentBufferRemaining >= sizeof(RecordLengthType)); + RecordLengthType *res = (RecordLengthType *) currentBuffer; + currentBuffer += sizeof(RecordLengthType); + currentBufferRemaining -= sizeof(RecordLengthType); + checkNext(); // Note that length is never separated from data... but length can be zero so still need to do this... + return res; + } + + void checkNext() + { + if (!currentBufferRemaining) + { + if (buffers.isItem(bufferIdx+1)) + { + bufferIdx++; + currentBuffer = (const byte *) &buffers.item(bufferIdx).get()->data; + currentBufferRemaining = *(unsigned short *) currentBuffer; + currentBuffer += sizeof(unsigned short); + } + else + { + currentBuffer = nullptr; + } + } + } +}; + class CLocalMessageResult : implements IMessageResult, public CInterface { void *data; void *meta; void *header; unsigned datalen, metalen, headerlen; - unsigned pos; public: IMPLEMENT_IINTERFACE; CLocalMessageResult(void *_data, unsigned _datalen, void *_meta, unsigned _metalen, void *_header, unsigned _headerlen) @@ -3117,7 +3378,6 @@ class CLocalMessageResult : implements IMessageResult, public CInterface data = _data; meta = _meta; header = _header; - pos = 0; } ~CLocalMessageResult() @@ -3150,6 +3410,52 @@ class CLocalMessageResult : implements IMessageResult, public CInterface }; +class CLocalBlockedMessageResult : implements IMessageResult, public CInterface +{ + ArrayOf buffers; + void *meta; + void *header; + unsigned metalen, headerlen; +public: + IMPLEMENT_IINTERFACE; + CLocalBlockedMessageResult(ArrayOf &_buffers, void *_meta, unsigned _metalen, void *_header, unsigned _headerlen) + { + buffers.swapWith(_buffers); + metalen = _metalen; + headerlen = _headerlen; + meta = _meta; + header = _header; + } + + ~CLocalBlockedMessageResult() + { + free(meta); + free(header); + } + + virtual IMessageUnpackCursor *getCursor(IRowManager *rowMgr) const + { + return new CLocalBlockedMessageUnpackCursor(rowMgr, buffers); + } + + virtual const void *getMessageHeader(unsigned &length) const + { + length = headerlen; + return header; + } + + virtual const void *getMessageMetadata(unsigned &length) const + { + length = metalen; + return meta; + } + + virtual void discard() const + { + } + +}; + class CLocalMessageCollator : implements ILocalMessageCollator, public CInterface { InterruptableSemaphore sem; @@ -3159,6 +3465,7 @@ class CLocalMessageCollator : implements ILocalMessageCollator, public CInterfac Linked receiveManager; ruid_t id; unsigned totalBytesReceived; + bool memLimitExceeded = false; public: IMPLEMENT_IINTERFACE; @@ -3172,6 +3479,11 @@ class CLocalMessageCollator : implements ILocalMessageCollator, public CInterfac virtual IMessageResult* getNextResult(unsigned time_out, bool &anyActivity) { + if (memLimitExceeded) + { + DBGLOG("LocalCollator: CLocalMessageCollator::getNextResult() throwing memory limit exceeded exception"); + throw MakeStringException(0, "memory limit exceeded"); + } anyActivity = false; if (!sem.wait(time_out)) return NULL; @@ -3185,15 +3497,32 @@ class CLocalMessageCollator : implements ILocalMessageCollator, public CInterfac sem.interrupt(E); } - virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen) + virtual void enqueueMessage(bool outOfBand, unsigned totalSize, IMessageResult *result) override { CriticalBlock c(crit); if (outOfBand) - pending.enqueueHead(new CLocalMessageResult(data, datalen, meta, metalen, header, headerlen)); + pending.enqueueHead(result); else - pending.enqueue(new CLocalMessageResult(data, datalen, meta, metalen, header, headerlen)); + pending.enqueue(result); sem.signal(); - totalBytesReceived += datalen + metalen + headerlen; + totalBytesReceived += totalSize; + } + + virtual bool attachDataBuffers(const ArrayOf &buffers) override + { + if (memLimitExceeded) + return false; + ForEachItemIn(idx, buffers) + { + roxiemem::OwnedDataBuffer dataBuffer = buffers.item(idx); + if (!dataBuffer.get()->attachToRowMgr(rowManager)) + { + memLimitExceeded = true; + interrupt(MakeStringException(0, "memory limit exceeded")); + return(false); + } + } + return true; } virtual unsigned queryBytesReceived() const @@ -3259,7 +3588,27 @@ void LocalMessagePacker::flush() unsigned datalen = data.length(); unsigned metalen = meta.length(); unsigned headerlen = header.length(); - collator->enqueueMessage(outOfBand, data.detach(), datalen, meta.detach(), metalen, header.detach(), headerlen); + collator->enqueueMessage(outOfBand, datalen+metalen+headerlen, new CLocalMessageResult(data.detach(), datalen, meta.detach(), metalen, header.detach(), headerlen)); + } + // otherwise Roxie server is no longer interested and we can simply discard +} + +void LocalBlockedMessagePacker::flush() +{ + if (currentBuffer && (dataPos > sizeof(unsigned short))) + { + *(unsigned short *) ¤tBuffer->data = dataPos - sizeof(unsigned short); + buffers.append(currentBuffer); + currentBuffer = nullptr; + } + Owned collator = rm->lookupCollator(id); + if (collator) + { + unsigned metalen = meta.length(); + unsigned headerlen = header.length(); + // NOTE - takes ownership of buffers and leaves it empty + if (collator->attachDataBuffers(buffers)) + collator->enqueueMessage(outOfBand, totalDataLen+metalen+headerlen, new CLocalBlockedMessageResult(buffers, meta.detach(), metalen, header.detach(), headerlen)); } // otherwise Roxie server is no longer interested and we can simply discard } @@ -3380,7 +3729,10 @@ class RoxieLocalQueueManager : public RoxieReceiverBase virtual IMessagePacker *createOutputStream(RoxiePacketHeader &header, bool outOfBand, const IRoxieContextLogger &logctx) override { - return new LocalMessagePacker(header, outOfBand, receiveManager); + if (blockedLocalAgent) + return new LocalBlockedMessagePacker(header, outOfBand, receiveManager); + else + return new LocalMessagePacker(header, outOfBand, receiveManager); } virtual IReceiveManager *queryReceiveManager() override diff --git a/roxie/roxiemem/roxiemem.hpp b/roxie/roxiemem/roxiemem.hpp index 4a671c0efe3..d5b3cd1bdbf 100644 --- a/roxie/roxiemem/roxiemem.hpp +++ b/roxie/roxiemem/roxiemem.hpp @@ -418,6 +418,30 @@ class OwnedRoxieString const char * ptr; }; +class OwnedDataBuffer +{ +public: + inline OwnedDataBuffer() { ptr = NULL; } + inline OwnedDataBuffer(DataBuffer * _ptr) { ptr = _ptr; } + inline OwnedDataBuffer(const OwnedDataBuffer & other) { ptr = other.getLink(); } + + inline ~OwnedDataBuffer() { if (ptr) ptr->Release(); } + + inline operator DataBuffer *() const { return ptr; } + inline DataBuffer * get() const { return ptr; } + inline DataBuffer * getLink() const { if (ptr) ptr->Link(); return ptr; } + inline DataBuffer * set(DataBuffer * _ptr) { DataBuffer * temp = ptr; if (_ptr) _ptr->Link(); ptr = _ptr; if (temp) temp->Release(); return ptr; } + inline DataBuffer * setown(DataBuffer * _ptr) { DataBuffer * temp = ptr; ptr = _ptr; if (temp) temp->Release(); return ptr; } + inline void clear() { DataBuffer * temp = ptr; ptr = NULL; if (temp) temp->Release(); } +private: + /* Disable use of some constructs that often cause memory leaks by creating private members */ + void operator = (const void * _ptr) { } + void operator = (const OwnedDataBuffer & other) { } + void setown(const OwnedDataBuffer &other) { } + +private: + DataBuffer * ptr; +}; interface IRowHeap : extends IInterface { diff --git a/roxie/udplib/udplib.hpp b/roxie/udplib/udplib.hpp index f9e230c7d06..c832c506540 100644 --- a/roxie/udplib/udplib.hpp +++ b/roxie/udplib/udplib.hpp @@ -81,6 +81,7 @@ class UDPLIB_API ServerIdentifier }; extern UDPLIB_API ServerIdentifier myNode; +extern UDPLIB_API roxiemem::IDataBufferManager *bufferManager; interface IMessagePacker : extends IInterface { diff --git a/roxie/udplib/udpsha.hpp b/roxie/udplib/udpsha.hpp index b2dfa39f5e4..be41365e205 100644 --- a/roxie/udplib/udpsha.hpp +++ b/roxie/udplib/udpsha.hpp @@ -28,8 +28,6 @@ typedef unsigned sequence_t; #define SEQF -extern roxiemem::IDataBufferManager *bufferManager; - typedef bool (*PKT_CMP_FUN) (const void *pkData, const void *key);