Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.4.x' into candidate-…
Browse files Browse the repository at this point in the history
…9.4.26

Signed-off-by: Gavin Halliday <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	helm/hpcc/templates/dafilesrv.yaml
#	helm/hpcc/templates/dali.yaml
#	helm/hpcc/templates/dfuserver.yaml
#	helm/hpcc/templates/eclagent.yaml
#	helm/hpcc/templates/eclccserver.yaml
#	helm/hpcc/templates/eclscheduler.yaml
#	helm/hpcc/templates/esp.yaml
#	helm/hpcc/templates/localroxie.yaml
#	helm/hpcc/templates/roxie.yaml
#	helm/hpcc/templates/sasha.yaml
#	helm/hpcc/templates/thor.yaml
#	version.cmake
  • Loading branch information
ghalliday committed Jan 16, 2024
2 parents 3d4e923 + 71d4b75 commit aab4332
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 48 deletions.
7 changes: 5 additions & 2 deletions common/thorhelper/roxierow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,12 @@ class CAllocatorCache : public CSimpleInterfaceOf<IRowAllocatorMetaActIdCache>
CAllocatorCacheItem *container = _lookup(meta, activityId, flags);
if (container)
{
if (0 == (roxiemem::RHFunique & flags))
if (0 == ((roxiemem::RHFunique|roxiemem::RHFblocked) & flags))
return LINK(&container->queryElement());
// if in cache but unique, reuse allocatorId
// If in cache but unique, reuse allocatorId, but create a unique allocator (and heap)
// If blocked the allocator must not be commoned up! (The underlying heap will be within roxiemem.)
// This is very unusual, but can happen if a library is used more than once within the same query
// since you will have multiple activity instances with the same activityId.
SpinUnblock b(allAllocatorsLock);
return callback->createAllocator(this, meta, activityId, container->queryAllocatorId(), flags);
}
Expand Down
7 changes: 4 additions & 3 deletions dali/base/dafdesc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3856,9 +3856,10 @@ class CStoragePlaneInfo : public CInterfaceOf<IStoragePlane>
Owned<IPropertyTreeIterator> srcAliases = xml->getElements("aliases");
ForEach(*srcAliases)
aliases.push_back(new CStoragePlaneAlias(&srcAliases->query()));
Owned<IPropertyTreeIterator> srcHosts = xml->getElements("hosts");
ForEach(*srcHosts)
hosts.emplace_back(srcHosts->query().queryProp(nullptr));
StringArray planeHosts;
getPlaneHosts(planeHosts, xml);
ForEachItemIn(h, planeHosts)
hosts.emplace_back(planeHosts.item(h));
}

virtual const char * queryPrefix() const override { return xml->queryProp("@prefix"); }
Expand Down
13 changes: 6 additions & 7 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,12 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
#endif
configurePreferredPlanes();
createDelayedReleaser();
CCycleTimer loadPackageTimer;
globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
globalPackageSetManager->load();
if (traceLevel)
DBGLOG("Loading all packages took %ums", loadPackageTimer.elapsedMs());

ROQ = createOutputQueueManager(numAgentThreads, encryptInTransit);
ROQ->setHeadRegionSize(headRegionSize);
ROQ->start();
Expand All @@ -1428,13 +1434,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)

EnableSEHtoExceptionMapping();
setSEHtoExceptionHandler(&abortHandler);

CCycleTimer loadPackageTimer;
globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
globalPackageSetManager->load();
if (traceLevel)
DBGLOG("Loading all packages took %ums", loadPackageTimer.elapsedMs());

Owned<IHpccProtocolPluginContext> protocolCtx = new CHpccProtocolPluginCtx();
if (runOnce)
{
Expand Down
36 changes: 30 additions & 6 deletions roxie/udplib/udptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
Thread::start();
started.wait();
}

~receive_data()
{
DBGLOG("Total data packets seen = %u OOO(%u) Requests(%u) Permits(%u)", dataPacketsReceived.load(), packetsOOO.load(), flowRequestsReceived.load(), flowRequestsSent.load());
Expand Down Expand Up @@ -1336,20 +1336,44 @@ class CReceiveManager : implements IReceiveManager, public CInterface
try
{
unsigned int res;
UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
while (true)
{
receive_socket->readtms(b->data, 1, DATA_PAYLOAD, res, timeout);
if (res!=sizeof(UdpRequestToSendMsg))
break;
//Read at least the size of the smallest packet we can receive
//static assert to check we are reading the smaller of the two possible packet types
static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader));
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);

//Even if a UDP packet is not split, very occasionally only some of the data may be present for the read.
//Slightly horribly this packet could be one of two different formats(!)
// a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd
// a UdpPacketHeader which has a 2 byte length. This length must be > sizeof(UdpPacketHeader).
//Since max_flow_cmd < sizeof(UdpPacketHeader) this can be used to distinguish a true data packet(!)
static_assert(flowType::max_flow_cmd < sizeof(UdpPacketHeader)); // assert to check the above comment is correct

if (hdr.length >= sizeof(UdpPacketHeader))
{
if (res == hdr.length)
break;

//Very rare situation - log it so that there is some evidence that it is occurring
OWARNLOG("Received partial network packet - %u bytes out of %u received", res, hdr.length);

//Because we are reading UDP datgrams rather than tcp packets, if we failed to read the whole datagram
//the rest of the datgram is lost - you cannot call readtms to read the rest of the datagram.
//Therefore throw this incomplete datagram away and allow the resend mechanism to retransmit it.
continue;
}

//Sanity check
assertex(res == sizeof(UdpRequestToSendMsg));

//Sending flow packets (eg send_completed) to the data thread ensures they do not overtake the data
//Redirect them to the flow thread to process them.
selfFlowSocket->write(b->data, res);
}

dataPacketsReceived++;
UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
assert(hdr.length == res && hdr.length > sizeof(hdr));
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
if (sender->noteSeen(hdr))
{
Expand Down
2 changes: 1 addition & 1 deletion roxie/udplib/udptrs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ class UdpReceiverEntry : public IUdpReceiverEntry
aesEncrypt(udpkey.get(), udpkey.length(), data, length, encryptBuffer);
header->length = encryptBuffer.length();
encryptBuffer.writeDirect(0, sizeof(UdpPacketHeader), header); // Only really need length updating
assert(length <= DATA_PAYLOAD);
assertex(encryptBuffer.length() <= DATA_PAYLOAD);
data_socket->write(encryptBuffer.toByteArray(), encryptBuffer.length());
}
else
Expand Down
2 changes: 2 additions & 0 deletions rtl/eclrtl/eclhelper_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ const char * CThorSoapActionArg::getXpathHintsXml() { return nullptr;}
const char * CThorSoapActionArg::getRequestHeader() { return nullptr; }
const char * CThorSoapActionArg::getRequestFooter() { return nullptr; }
unsigned CThorSoapActionArg::getPersistMaxRequests() { return 0; }
unsigned CThorSoapActionArg::getPersistPoolSize() { return 0; }

//CThorSoapCallArg

Expand All @@ -646,6 +647,7 @@ const char * CThorSoapCallArg::getXpathHintsXml() { return nullptr; }
const char * CThorSoapCallArg::getRequestHeader() { return nullptr; }
const char * CThorSoapCallArg::getRequestFooter() { return nullptr; }
unsigned CThorSoapCallArg::getPersistMaxRequests() { return 0; }
unsigned CThorSoapCallArg::getPersistPoolSize() { return 0; }

size32_t CThorSoapCallArg::onFailTransform(ARowBuilder & rowBuilder, const void * left, IException * e) { return 0; }
void CThorSoapCallArg::getLogText(size32_t & lenText, char * & text, const void * left) { lenText =0; text = NULL; }
Expand Down
2 changes: 2 additions & 0 deletions rtl/include/eclhelper_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ class ECLRTL_API CThorSoapActionArg : public CThorSinkArgOf<IHThorSoapActionArg>
virtual const char * getRequestHeader() override;
virtual const char * getRequestFooter() override;
virtual unsigned getPersistMaxRequests() override;
unsigned getPersistPoolSize(); // Included for backward compatibility to prevent linking problems
};

class ECLRTL_API CThorSoapCallArg : public CThorArgOf<IHThorSoapCallArg>
Expand Down Expand Up @@ -854,6 +855,7 @@ class ECLRTL_API CThorSoapCallArg : public CThorArgOf<IHThorSoapCallArg>
virtual const char * getRequestHeader() override;
virtual const char * getRequestFooter() override;
virtual unsigned getPersistMaxRequests() override;
unsigned getPersistPoolSize(); // Included for backward compatibility to prevent linking problems
};

typedef CThorSoapCallArg CThorHttpCallArg;
Expand Down
3 changes: 0 additions & 3 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,8 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface

virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const
{
// IO Stats coming from the keyCursor and jhtree cache stats coming from this class's stats
if (keyCursor)
keyCursor->mergeStats(targetStats); // merge IO stats
if (ctx)
targetStats.merge(ctx->queryStats()); // merge jhtree cache stats
}
};

Expand Down
8 changes: 5 additions & 3 deletions system/jlib/jencrypt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,8 @@ MemoryBuffer &aesEncrypt(const void *key, size_t keylen, const void *plaintext,
encryptError("Unsupported key length");
break;
}
byte *ciphertext = (byte *) output.reserve(plaintext_len + 100);
unsigned originalLen = output.length();
byte *ciphertext = (byte *) output.reserve(plaintext_len + 16);
int ciphertext_len = 0;
int thislen = 0;
if(1 != EVP_EncryptUpdate(ctx, ciphertext, &thislen, (const unsigned char *) plaintext, plaintext_len))
Expand All @@ -1880,7 +1881,7 @@ MemoryBuffer &aesEncrypt(const void *key, size_t keylen, const void *plaintext,
encryptError("Error in EVP_EncryptFinal_ex");
ciphertext_len += thislen;
EVP_CIPHER_CTX_free(ctx);
output.setLength(ciphertext_len);
output.setLength(originalLen + ciphertext_len);
return output;
}
catch (...)
Expand Down Expand Up @@ -1928,6 +1929,7 @@ MemoryBuffer &aesDecrypt(const void *key, size_t keylen, const void *ciphertext,
decryptError("Unsupported key length");
break;
}
unsigned originalLen = output.length();
byte *plaintext = (byte *) output.reserve(ciphertext_len);
if(1 != EVP_DecryptUpdate(ctx, plaintext, &thislen, (const unsigned char *) ciphertext, ciphertext_len))
decryptError("Error in EVP_DecryptUpdate");
Expand All @@ -1936,7 +1938,7 @@ MemoryBuffer &aesDecrypt(const void *key, size_t keylen, const void *ciphertext,
if(1 != EVP_DecryptFinal_ex(ctx, plaintext + plaintext_len, &thislen))
decryptError("Error in EVP_DecryptFinal_ex");
plaintext_len += thislen;
output.setLength(plaintext_len);
output.setLength(originalLen + plaintext_len);
EVP_CIPHER_CTX_free(ctx);
return output;
}
Expand Down
2 changes: 1 addition & 1 deletion system/security/securesocket/securesocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,7 @@ class CSecureSocketContext : implements ISecureSocketContext, implements ISecure
Owned<CStringSet> m_peers;
StringAttr password;
CriticalSection cs;
Owned<const ISyncedPropertyTree> syncedConfig;
Linked<const ISyncedPropertyTree> syncedConfig;
unsigned configVersion = 0;

void setSessionIdContext()
Expand Down
4 changes: 3 additions & 1 deletion testing/regress/ecl/embedpy3-catch.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
//skip type==thorlcr TBD

IMPORT Python3;
IMPORT Std.Str;

integer testThrow(integer val) := EMBED(Python3)
raise Exception('Error from Python')
Expand All @@ -32,8 +33,9 @@ ENDEMBED;
d := dataset([{ 1, '' }], { integer a, string m} ) : stored('nofold');

d t := transform
eol := Str.Find(FAILMESSAGE, '\n');
self.a := FAILCODE;
self.m := FAILMESSAGE;
self.m := IF(eol != 0, FAILMESSAGE[1..eol-1], FAILMESSAGE);
self := [];
end;

Expand Down
60 changes: 41 additions & 19 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3778,49 +3778,71 @@ class JLibOpensslAESTest : public CppUnit::TestFixture
0x36, 0x37, 0x38, 0x39, 0x30, 0x31, 0x32, 0x33,
0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, 0x31
};
constexpr const char * prefix = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
constexpr unsigned lenPrefix = strlen(prefix);

MemoryBuffer ciphertext1, ciphertext2, decrypted1, decrypted2;

ciphertext1.append(lenPrefix, prefix);
ciphertext2.append(lenPrefix, prefix);
openssl::aesEncrypt(key, 32, intext, len, ciphertext1);
jlib::aesEncrypt(key, 32, intext, len, ciphertext2);

CPPUNIT_ASSERT(memcmp(ciphertext1.bytes(), prefix, lenPrefix) == 0);
CPPUNIT_ASSERT(memcmp(ciphertext2.bytes(), prefix, lenPrefix) == 0);
if (len)
CPPUNIT_ASSERT(ciphertext1.length() > len + lenPrefix);
else
CPPUNIT_ASSERT(ciphertext1.length() == len + lenPrefix);
CPPUNIT_ASSERT(ciphertext1.length() <= len + lenPrefix + 16);
CPPUNIT_ASSERT(ciphertext1.length()==ciphertext2.length());
CPPUNIT_ASSERT(memcmp(ciphertext1.bytes(), ciphertext2.bytes(), ciphertext1.length()) == 0);

unsigned cipherlen = ciphertext1.length() - lenPrefix;

/* Decrypt the ciphertext */
openssl::aesDecrypt(key, 32, ciphertext1.bytes(), ciphertext1.length(), decrypted1);
assert(decrypted1.length() == len);
CPPUNIT_ASSERT(decrypted1.length() == len);
CPPUNIT_ASSERT(memcmp(decrypted1.bytes(), intext, len) == 0);
decrypted1.append(lenPrefix, prefix);
openssl::aesDecrypt(key, 32, ciphertext1.bytes()+lenPrefix, cipherlen, decrypted1);
CPPUNIT_ASSERT(decrypted1.length() == len+lenPrefix);
CPPUNIT_ASSERT(memcmp(decrypted1.bytes(), prefix, lenPrefix) == 0);
CPPUNIT_ASSERT(memcmp(decrypted1.bytes()+lenPrefix, intext, len) == 0);
CPPUNIT_ASSERT(memcmp(ciphertext1.bytes(), ciphertext2.bytes(), ciphertext1.length()) == 0); // check input unchanged

jlib::aesDecrypt(key, 32, ciphertext2.bytes(), ciphertext2.length(), decrypted2);
CPPUNIT_ASSERT(decrypted2.length() == len);
CPPUNIT_ASSERT(memcmp(decrypted2.bytes(), intext, len) == 0);
decrypted2.append(lenPrefix, prefix);
jlib::aesDecrypt(key, 32, ciphertext2.bytes()+lenPrefix, cipherlen, decrypted2);
CPPUNIT_ASSERT(memcmp(decrypted2.bytes(), prefix, lenPrefix) == 0);
CPPUNIT_ASSERT(decrypted2.length() == len + lenPrefix);
CPPUNIT_ASSERT(memcmp(decrypted2.bytes() + lenPrefix, intext, len) == 0);
CPPUNIT_ASSERT(memcmp(ciphertext1.bytes(), ciphertext2.bytes(), ciphertext1.length()) == 0); // check input unchanged

// Now test in-place decrypt
unsigned cipherlen = ciphertext1.length();
ciphertext1.append(4, "XXXX"); // Marker
unsigned decryptedlen = openssl::aesDecryptInPlace(key, 32, (void *) ciphertext1.bytes(), cipherlen);
unsigned decryptedlen = openssl::aesDecryptInPlace(key, 32, (void *)(ciphertext1.bytes() + lenPrefix), cipherlen);
CPPUNIT_ASSERT(decryptedlen == len);
CPPUNIT_ASSERT(memcmp(ciphertext1.bytes(), intext, len) == 0);
CPPUNIT_ASSERT(memcmp(ciphertext1.bytes()+cipherlen, "XXXX", 4) == 0);
CPPUNIT_ASSERT(memcmp(ciphertext1.bytes()+lenPrefix, intext, len) == 0);
CPPUNIT_ASSERT(memcmp(ciphertext1.bytes()+lenPrefix+cipherlen, "XXXX", 4) == 0);

cipherlen = ciphertext2.length();
ciphertext2.append(4, "XXXX"); // Marker
decryptedlen = jlib::aesDecryptInPlace(key, 32, (void *) ciphertext2.bytes(), cipherlen);
decryptedlen = jlib::aesDecryptInPlace(key, 32, (void *)(ciphertext2.bytes() + lenPrefix), cipherlen);
CPPUNIT_ASSERT(decryptedlen == len);
CPPUNIT_ASSERT(memcmp(ciphertext2.bytes(), intext, len) == 0);
CPPUNIT_ASSERT(memcmp(ciphertext2.bytes()+cipherlen, "XXXX", 4) == 0);
CPPUNIT_ASSERT(memcmp(ciphertext2.bytes()+lenPrefix, intext, len) == 0);
CPPUNIT_ASSERT(memcmp(ciphertext2.bytes()+lenPrefix+cipherlen, "XXXX", 4) == 0);
}

void test()
{
/* Message to be encrypted */
const char *plaintext = "The quick brown fox jumps over the lazy dog";
for (unsigned l = 0; l < strlen(plaintext); l++)
testOne(l, plaintext);
try
{
/* Message to be encrypted */
const char *plaintext = "The quick brown fox jumps over the lazy dog";
for (unsigned l = 0; l < strlen(plaintext); l++)
testOne(l, plaintext);
}
catch (IException * e)
{
EXCLOG(e, "Exception in AES unit test");
throw;
}
}

};
Expand Down
8 changes: 6 additions & 2 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1409,10 +1409,14 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
if (isSuper)
{
unsigned subfile = subFileNum[i];
keyManager->mergeStats(*fileStats[startOffset+subfile]);
keyManager->mergeStats(*fileStats[startOffset+subfile]); // merge IO stats
fileStats[startOffset+subfile]->merge(contextLoggers[i]->queryStats()); // merge jhtreeStats
}
else
keyManager->mergeStats(*fileStats[startOffset]);
{
keyManager->mergeStats(*fileStats[startOffset]); // merge IO stats
fileStats[startOffset]->merge(contextLoggers[i]->queryStats()); // merge jhtreeStats
}
}
}
};
Expand Down

0 comments on commit aab4332

Please sign in to comment.