Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.4.x' into candidate-…
Browse files Browse the repository at this point in the history
…9.6.x

Signed-off-by: Jake Smith <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	helm/hpcc/templates/dafilesrv.yaml
#	helm/hpcc/templates/dali.yaml
#	helm/hpcc/templates/dfuserver.yaml
#	helm/hpcc/templates/eclagent.yaml
#	helm/hpcc/templates/eclccserver.yaml
#	helm/hpcc/templates/eclscheduler.yaml
#	helm/hpcc/templates/esp.yaml
#	helm/hpcc/templates/localroxie.yaml
#	helm/hpcc/templates/roxie.yaml
#	helm/hpcc/templates/sasha.yaml
#	helm/hpcc/templates/thor.yaml
#	version.cmake
  • Loading branch information
jakesmith committed Mar 21, 2024
2 parents 16bee08 + caa0a1c commit 0c4aa45
Show file tree
Hide file tree
Showing 21 changed files with 168 additions and 83 deletions.
29 changes: 14 additions & 15 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11215,23 +11215,22 @@ class CDaliDFSServer: public Thread, public CTransactionLogTracker, implements I
Owned<IPropertyTree> tree = getNamedPropTree(sroot,queryDfsXmlBranchName(DXB_File),"@name",tail.str(),false);
if (tree)
{
if (isContainerized())
// This is for bare-metal clients using ~foreign pointing at a containerized/k8s setup,
// asking for the returned meta data to be remapped to point to the dafilesrv service.
if (isContainerized() && hasMask(opts, GetFileTreeOpts::remapToService))
{
// This is for bare-metal clients using ~foreign pointing at a containerized/k8s setup,
// asking for the returned meta data to be remapped to point to the dafilesrv service.
if (hasMask(opts, GetFileTreeOpts::remapToService))
{
tree.setown(createPTreeFromIPT(tree)); // copy live Dali tree, because it is about to be altered by remapGroupsToDafilesrv
remapGroupsToDafilesrv(tree, true, secureService);
groupResolver = nullptr; // do not attempt to resolve remapped group (it will not exist and cause addUnique to create a new anon one)

const char *remotePlaneName = tree->queryProp("@group");
Owned<IPropertyTree> filePlane = getStoragePlane(remotePlaneName);
assertex(filePlane);
// Used by DFS clients to determine if stripe and/or alias translation needed
tree->setPropTree("Attr/_remoteStoragePlane", createPTreeFromIPT(filePlane));
}
tree.setown(createPTreeFromIPT(tree)); // copy live Dali tree, because it is about to be altered by remapGroupsToDafilesrv
remapGroupsToDafilesrv(tree, true, secureService);
groupResolver = nullptr; // do not attempt to resolve remapped group (it will not exist and cause addUnique to create a new anon one)

const char *remotePlaneName = tree->queryProp("@group");
Owned<IPropertyTree> filePlane = getStoragePlane(remotePlaneName);
assertex(filePlane);
// Used by DFS clients to determine if stripe and/or alias translation needed
tree->setPropTree("Attr/_remoteStoragePlane", createPTreeFromIPT(filePlane));
}
else
tree->removeProp("Attr/_remoteStoragePlane");

Owned<IFileDescriptor> fdesc = deserializeFileDescriptorTree(tree,groupResolver,IFDSF_EXCLUDE_CLUSTERNAMES);
mb.append((int)1); // 1 == standard file
Expand Down
23 changes: 13 additions & 10 deletions dali/dfu/dfuutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,17 +419,20 @@ class CFileCloner
// for now, only use source file descriptor as cloned source if it's from
// wsdfs file backed by remote storage using dafilesrv (NB: if it is '_remoteStoragePlane' will be set)
// JCSMORE: it may be this can replace the need for the other 'clone*' attributes altogether.
if (srcfdesc->queryProperties().hasProp("_remoteStoragePlane"))
if (srcfdesc->queryProperties().hasProp("_remoteStoragePlane") && srcdali && !srcdali->endpoint().isNull())
{
if (srcdali && !srcdali->endpoint().isNull())
{
attrs.setPropTree("cloneFromFDesc", createPTreeFromIPT(srcTree));
StringBuffer host;
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(host).str());
if (prefix.length())
attrs.setProp("@cloneFromPrefix", prefix.get());
return;
}
attrs.setPropTree("cloneFromFDesc", createPTreeFromIPT(srcTree));
StringBuffer host;
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(host).str());
if (prefix.length())
attrs.setProp("@cloneFromPrefix", prefix.get());
return;
}
else
{
attrs.removeProp("cloneFromFDesc");
attrs.removeProp("@cloneFrom");
attrs.removeProp("@cloneFromPrefix");
}

while(attrs.removeProp("cloneFromGroup"));
Expand Down
3 changes: 3 additions & 0 deletions ecl/eclagent/eclagentmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ int main(int argc, const char *argv[])
try
{
ret = eclagent_main(argc, argv);
//Do not return a non-zero error code in containerized mode - otherwise the system will think it failed to run
if (isContainerized())
ret = 0;
}
catch (IException *E)
{
Expand Down
14 changes: 12 additions & 2 deletions esp/src/src-react/components/ECLArchive.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as React from "react";
import { CommandBar, ContextualMenuItemType, ICommandBarItemProps } from "@fluentui/react";
import { WUDetails, IScope } from "@hpcc-js/comms";
import { Workunit, WUDetails, IScope } from "@hpcc-js/comms";
import { scopedLogger } from "@hpcc-js/util";
import nlsHPCC from "src/nlsHPCC";
import { useWorkunitArchive } from "../hooks/workunit";
import { useWorkunitMetrics } from "../hooks/metrics";
Expand All @@ -12,6 +13,8 @@ import { ECLArchiveTree } from "./ECLArchiveTree";
import { ECLArchiveEditor } from "./ECLArchiveEditor";
import { MetricsPropertiesTables } from "./MetricsPropertiesTables";

const logger = scopedLogger("src-react/components/ECLArchive.tsx");

const scopeFilterDefault: WUDetails.RequestNS.ScopeFilter = {
MaxDepth: 999999,
ScopeTypes: ["graph"]
Expand Down Expand Up @@ -54,8 +57,15 @@ export const ECLArchive: React.FunctionComponent<ECLArchiveProps> = ({
setSelectionText(archive?.content(selection) ?? "");
setMarkers(archive?.markers(selection) ?? []);
setSelectedMetrics(archive?.metrics(selection) ?? []);
} else {
if (archive && !archive.build) {
const wu = Workunit.attach({ baseUrl: "" }, wuid);
wu.fetchQuery().then(function (query) {
setSelectionText(query?.Text ?? "");
}).catch(err => logger.error(err));
}
}
}, [archive, metrics.length, selection]);
}, [archive, metrics.length, selection, wuid]);

const setSelectedItem = React.useCallback((selId: string) => {
pushUrl(`${parentUrl}/${selId}`);
Expand Down
4 changes: 2 additions & 2 deletions esp/src/src-react/components/forms/Fields.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ const AsyncDropdown: React.FunctionComponent<AsyncDropdownProps> = ({
}, [onChange, selectedItem, selectedIdx, selectedKey]);

return options === undefined ?
<DropdownBase label={label} options={[]} placeholder={nlsHPCC.loadingMessage} disabled={true} /> :
<DropdownBase label={label} options={selOptions} selectedKey={selectedItem?.key} onChange={(_, item: IDropdownOption) => setSelectedItem(item)} placeholder={placeholder} disabled={disabled} required={required} errorMessage={errorMessage} className={className} />;
<DropdownBase label={label} dropdownWidth="auto" options={[]} placeholder={nlsHPCC.loadingMessage} disabled={true} /> :
<DropdownBase label={label} dropdownWidth="auto" options={selOptions} selectedKey={selectedItem?.key} onChange={(_, item: IDropdownOption) => setSelectedItem(item)} placeholder={placeholder} disabled={disabled} required={required} errorMessage={errorMessage} className={className} />;
};

interface DropdownMultiProps {
Expand Down
17 changes: 12 additions & 5 deletions fs/dafsserver/dafsserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1235,11 +1235,14 @@ class CRemoteDiskBaseActivity : public CSimpleInterfaceOf<IRemoteReadActivity>,
}
virtual void serializeCursor(MemoryBuffer &tgt) const override
{
throwUnexpected();
// we need to serialize something, because the lack of a cursor is used to signify end of stream
// NB: the cursor is opaque and only to be consumed by dafilesrv. When used it is simply passed back.
tgt.append("UNSUPPORTED");
}
virtual void restoreCursor(MemoryBuffer &src) override
{
throwUnexpected();
throw makeStringExceptionV(0, "restoreCursor not supported in: %s", typeid(*this).name());
throwUnimplemented();
}
virtual void flushStatistics(CClientStats &stats) override
{
Expand Down Expand Up @@ -2019,11 +2022,13 @@ class CRemoteJsonReadActivity : public CRemoteMarkupReadActivity
public:
CRemoteJsonReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKjsonread)
{
xpath.set("/");
if (customRowTag.isEmpty()) // no override
fileDesc->queryProperties().getProp("@rowTag", xpath);
else
{
xpath.set("/");
xpath.append(customRowTag);
}
}
};

Expand Down Expand Up @@ -2392,11 +2397,13 @@ class CRemoteWriteBaseActivity : public CSimpleInterfaceOf<IRemoteWriteActivity>
}
virtual void serializeCursor(MemoryBuffer &tgt) const override
{
throwUnexpected();
// we need to serialize something, because the lack of a cursor is used to signify end of stream
// NB: the cursor is opaque and only to be consumed by dafilesrv. When used it is simply passed back.
tgt.append("UNSUPPORTED");
}
virtual void restoreCursor(MemoryBuffer &src) override
{
throwUnexpected();
throw makeStringExceptionV(0, "restoreCursor not supported in: %s", typeid(*this).name());
}
virtual StringBuffer &getInfoStr(StringBuffer &out) const override
{
Expand Down
6 changes: 5 additions & 1 deletion roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,11 @@ inline unsigned getBondedChannel(unsigned partNo)
extern void FatalError(const char *format, ...) __attribute__((format(printf, 1, 2)));
extern unsigned getNextInstanceId();
extern void closedown();
extern void saveTopology();
extern void saveTopology(bool lockDali);
extern unsigned __int64 getTopologyHash();

extern unsigned __int64 currentTopologyHash;
extern unsigned __int64 originalTopologyHash;

#define LOGGING_INTERCEPTED 0x01
#define LOGGING_TIMEACTIVITIES 0x02
Expand Down
21 changes: 17 additions & 4 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,16 +419,18 @@ int myhook(int alloctype, void *, size_t nSize, int p1, long allocSeq, const uns
}
#endif

void saveTopology()
void saveTopology(bool lockDali)
{
// Write back changes that have been made via certain control:xxx changes, so that they survive a roxie restart
// Write back changes that have been made via control:(un)lockDali changes, so that they survive a roxie restart
// Note that they are overwritten when Roxie is manually stopped/started via hpcc-init service - these changes
// are only intended to be temporary for the current session
if (!useOldTopology)
return;
try
{
saveXML(topologyFile.str(), topology);
Owned<IPTree> tempTopology = createPTreeFromXMLFile(topologyFile.str(), ipt_caseInsensitive);
tempTopology->setPropBool("@lockDali", lockDali);
saveXML(topologyFile.str(), tempTopology);
}
catch (IException *E)
{
Expand Down Expand Up @@ -461,6 +463,16 @@ static std::vector<std::pair<unsigned, unsigned>> agentChannels;

void *leakChecker = nullptr; // Used to deliberately leak an allocation to ensure leak checking is working

unsigned __int64 currentTopologyHash = 0;
unsigned __int64 originalTopologyHash = 0;

hash64_t getTopologyHash()
{
StringBuffer xml;
toXML(topology, xml, 0, XML_SortTags);
return rtlHash64Data(xml.length(), xml.str(), 707018);
}

#ifndef _CONTAINERIZED
void readStaticTopology()
{
Expand Down Expand Up @@ -695,7 +707,8 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
topologyFile.append(codeDirectory).append(PATHSEPCHAR).append("RoxieTopology.xml");
useOldTopology = checkFileExists(topologyFile.str());
topology = loadConfiguration(useOldTopology ? nullptr : defaultYaml, argv, "roxie", "ROXIE", topologyFile, nullptr, "@netAddress");
saveTopology();
saveTopology(topology->getPropBool("@lockDali", false));
originalTopologyHash = currentTopologyHash = getTopologyHash();

// Any settings we read from topology that must NOT be overridden in workunit debug fields should be read at this point, before the following section
getAllowedPipePrograms(allowedPipePrograms, true);
Expand Down
14 changes: 14 additions & 0 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,8 @@ struct PingRecord
{
unsigned tick;
IpAddress senderIP;
unsigned __int64 currentTopoHash;
unsigned __int64 originalTopoHash;
};

void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
Expand All @@ -1059,6 +1061,16 @@ void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
StringBuffer s;
throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Message received from unknown Roxie server %s", header.toString(s).str());
}
if (originalTopologyHash != data->originalTopoHash)
{
StringBuffer s;
EXCLOG(MCoperatorError,"ERROR: Configuration file mismatch detected with Roxie server %s", header.toString(s).str());
}
if (currentTopologyHash != data->currentTopoHash)
{
StringBuffer s;
DBGLOG("WARNING: Temporary configuration mismatch detected with Roxie server %s", header.toString(s).str());
}
RoxiePacketHeader newHeader(header, ROXIE_PING, 0); // subchannel not relevant
Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
void *ret = output->getBuffer(contextLength, false);
Expand Down Expand Up @@ -3887,6 +3899,8 @@ class PingTimer : public Thread
PingRecord data;
data.senderIP.ipset(myNode.getIpAddress());
data.tick = usTick();
data.originalTopoHash = originalTopologyHash;
data.currentTopoHash = currentTopologyHash;
mb.append(sizeof(PingRecord), &data);
if (doTrace(traceRoxiePings))
DBGLOG("PING sent");
Expand Down
20 changes: 7 additions & 13 deletions roxie/ccd/ccdstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2580,7 +2580,7 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme
topology->setPropBool("@lockDali", true);
if (daliHelper)
daliHelper->disconnect();
saveTopology();
saveTopology(true);
}
else if (stricmp(queryName, "control:logfullqueries")==0)
{
Expand Down Expand Up @@ -2825,13 +2825,13 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme
reply.appendf("<Dali connected='1'/>");
else
reply.appendf("<Dali connected='0'/>");
unsigned __int64 thash = getTopologyHash();
unsigned __int64 shash;
{
ReadLockBlock readBlock(packageCrit);
shash = allQueryPackages->queryHash();
}
reply.appendf("<State hash='%" I64F "u' topologyHash='%" I64F "u'/>", shash, thash);
reply.appendf("<State hash='%" I64F "u' topologyHash='%" I64F "u' originalTopologyHash='%" I64F "u'/>",
shash, currentTopologyHash, originalTopologyHash);
}
else if (stricmp(queryName, "control:resetcache")==0)
{
Expand Down Expand Up @@ -2936,13 +2936,13 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme
reply.appendf("<Dali connected='1'/>");
else
reply.appendf("<Dali connected='0'/>");
unsigned __int64 thash = getTopologyHash();
unsigned __int64 shash;
{
ReadLockBlock readBlock(packageCrit);
shash = allQueryPackages->queryHash();
}
reply.appendf("<State hash='%" I64F "u' topologyHash='%" I64F "u'/>", shash, thash);
reply.appendf("<State hash='%" I64F "u' topologyHash='%" I64F "u' originalTopologyHash='%" I64F "u'/>",
shash, currentTopologyHash, originalTopologyHash);
}
else if (stricmp(queryName, "control:steppingEnabled")==0)
{
Expand Down Expand Up @@ -3029,7 +3029,7 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme
{
topology->setPropBool("@lockDali", false);
// Dali will reattach via the timer that checks every so often if can reattach...
saveTopology();
saveTopology(false);
}
else if (stricmp(queryName, "control:unsuspend")==0)
{
Expand Down Expand Up @@ -3068,6 +3068,7 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme
unknown = true;
break;
}
currentTopologyHash = getTopologyHash();
if (unknown)
throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
}
Expand All @@ -3076,13 +3077,6 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme
{
throw MakeStringException(ROXIE_INVALID_INPUT, "Badly formated control query");
}

hash64_t getTopologyHash()
{
StringBuffer xml;
toXML(topology, xml, 0, XML_SortTags);
return rtlHash64Data(xml.length(), xml.str(), 707018);
}
};

extern IRoxieQueryPackageManagerSet *createRoxiePackageSetManager(const IQueryDll *standAloneDll)
Expand Down
11 changes: 9 additions & 2 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2441,7 +2441,7 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
virtual IKeyIndex *queryPart(unsigned idx) { return idx ? NULL : this; }
virtual unsigned queryScans() { return realKey ? realKey->queryScans() : 0; }
virtual unsigned querySeeks() { return realKey ? realKey->querySeeks() : 0; }
virtual const char *queryFileName() { return keyfile.get(); }
virtual const char *queryFileName() const { return keyfile.get(); }
virtual offset_t queryBlobHead() { return checkOpen().queryBlobHead(); }
virtual void resetCounts() { if (realKey) realKey->resetCounts(); }
virtual offset_t queryLatestGetNodeOffset() const { return realKey ? realKey->queryLatestGetNodeOffset() : 0; }
Expand Down Expand Up @@ -2659,7 +2659,14 @@ const CJHTreeNode *CNodeCache::getNode(const INodeLoader *keyIndex, unsigned iD,
if (!ownedCacheEntry->isReady())
{
const CJHTreeNode *node = keyIndex->loadNode(&fetchCycles, pos);
assertex(type == node->getNodeType());
if (unlikely(type != node->getNodeType()))
{
//This should never happen, but if it does, report as much information as possible to diagnose the issue.
StringBuffer msg;
msg.appendf("Node type mismatch for node %s@%llx (expected %s, got %s)", keyIndex->queryFileName(), pos, cacheTypeText[type], cacheTypeText[node->getNodeType()]);
node->Release();
throwUnexpectedX(msg);
}

//Update the associated size of the entry in the hash table before setting isReady (never evicted until isReady is set)
curCache.noteReady(*node);
Expand Down
2 changes: 1 addition & 1 deletion system/jhtree/jhtree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ interface jhtree_decl IKeyIndex : public IKeyIndexBase
virtual unsigned querySeeks() = 0;
virtual size32_t keyedSize() = 0;
virtual bool hasPayload() = 0;
virtual const char *queryFileName() = 0;
virtual const char *queryFileName() const = 0;
virtual offset_t queryBlobHead() = 0;
virtual void resetCounts() = 0;
virtual offset_t queryLatestGetNodeOffset() const = 0;
Expand Down
Loading

0 comments on commit 0c4aa45

Please sign in to comment.