Skip to content

Commit

Permalink
HPCC-27843 CLI support for specifying remote storage instead of DALI
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony Fishbeck <[email protected]>
  • Loading branch information
afishbeck committed Sep 25, 2023
1 parent 85f7c98 commit d048f94
Show file tree
Hide file tree
Showing 30 changed files with 540 additions and 195 deletions.
2 changes: 2 additions & 0 deletions common/pkgfiles/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ include_directories (
${HPCC_SOURCE_DIR}/rtl/include
${HPCC_SOURCE_DIR}/rtl/eclrtl
${HPCC_SOURCE_DIR}/common/workunit
${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
)

if (NOT CONTAINERIZED)
Expand All @@ -58,6 +59,7 @@ if(NOT PLUGIN)
target_link_libraries(
pkgfiles
dfuwu
ws_dfsclient
)
if (NOT CONTAINERIZED)
target_link_libraries(pkgfiles environment)
Expand Down
2 changes: 1 addition & 1 deletion common/pkgfiles/pkgimpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class PKGFILES_API CPackageNode : implements IHpccPackage, public CInterface

inline StringBuffer &makeSuperFileXPath(StringBuffer &xpath, const char *superFileName) const
{
superFileName = skipForeign(superFileName);
superFileName = skipForeignOrRemote(superFileName);
return xpath.append("SuperFile[@id='").appendLower(strlen(superFileName), superFileName).append("']");
}

Expand Down
317 changes: 232 additions & 85 deletions common/pkgfiles/referencedfilelist.cpp

Large diffs are not rendered by default.

26 changes: 14 additions & 12 deletions common/pkgfiles/referencedfilelist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@
#define RefFileIndex 0x0001
#define RefFileNotOnCluster 0x0002
#define RefFileNotFound 0x0004
#define RefFileRemote 0x0008
#define RefFileForeign 0x0010
#define RefFileSuper 0x0020
#define RefSubFile 0x0040
#define RefFileCopyInfoFailed 0x0080
#define RefFileCloned 0x0100
#define RefFileInPackage 0x0200
#define RefFileNotOnSource 0x0400
#define RefFileOptional 0x0800 //File referenced in more than one place can be both optional and not optional
#define RefFileNotOptional 0x1000
#define RefFileResolvedForeign 0x0008
#define RefFileResolvedRemote 0x0010
#define RefFileLFNForeign 0x0020 //LFN was Foreign
#define RefFileLFNRemote 0x0040 //LFN was remote
#define RefFileSuper 0x0080
#define RefSubFile 0x0100
#define RefFileCopyInfoFailed 0x0200
#define RefFileCloned 0x0400
#define RefFileInPackage 0x0800
#define RefFileNotOnSource 0x1000
#define RefFileOptional 0x2000 //File referenced in more than one place can be both optional and not optional
#define RefFileNotOptional 0x4000


interface IReferencedFile : extends IInterface
Expand Down Expand Up @@ -70,14 +72,14 @@ interface IReferencedFileList : extends IInterface
virtual void addFiles(StringArray &files)=0;

virtual IReferencedFileIterator *getFiles()=0;
virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false)=0;
virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveLFNForeign, bool useRemoteStorage)=0;
virtual void cloneAllInfo(StringBuffer &publisherWuid, const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
virtual void cloneFileInfo(StringBuffer &publisherWuid, const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
virtual void cloneRelationships()=0;
virtual void setDfuQueue(const char *dfu_queue) = 0;
};

extern REFFILES_API const char *skipForeign(const char *name, StringBuffer *ip=NULL);
extern REFFILES_API const char *skipForeignOrRemote(const char *name, StringBuffer *ip=nullptr, StringBuffer *remote=nullptr);

extern REFFILES_API IReferencedFileList *createReferencedFileList(const char *user, const char *pw, bool allowForeignFiles, bool allowFileSizeCalc, const char *jobname = nullptr);
extern REFFILES_API IReferencedFileList *createReferencedFileList(IUserDescriptor *userDesc, bool allowForeignFiles, bool allowFileSizeCalc, const char *jobname = nullptr);
Expand Down
1 change: 1 addition & 0 deletions dali/datest/datest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ include_directories (
${HPCC_SOURCE_DIR}/system/jlib
${HPCC_SOURCE_DIR}/system/security/shared
${HPCC_SOURCE_DIR}/esp/clients/wsdfuaccess
${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
${HPCC_SOURCE_DIR}/fs/dafsstream
${HPCC_SOURCE_DIR}/rtl/include
${HPCC_SOURCE_DIR}/rtl/eclrtl
Expand Down
2 changes: 2 additions & 0 deletions dali/datest/dfuwutest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ include_directories (
./../../system/jlib
./../../common/workunit
../../common/environment
${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
)

HPCC_ADD_EXECUTABLE ( dfuwutest ${SRCS} )
Expand All @@ -50,6 +51,7 @@ target_link_libraries ( dfuwutest
dafsclient
dalibase
dfuwu
ws_dfsclient
)

if (NOT CONTAINERIZED)
Expand Down
14 changes: 8 additions & 6 deletions dali/dfu/dfurun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ class CDFUengine: public CInterface, implements IDFUengine
}

// first see if target exists (and remove if does and overwrite specified)
Owned<IDistributedFile> dfile = wsdfs::lookup(dlfn,ctx.user,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE);
Owned<IDistributedFile> dfile = wsdfs::lookup(dlfn,ctx.user,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE,false);
if (dfile) {
if (!ctx.superoptions->getOverwrite())
throw MakeStringException(-1,"Destination file %s already exists",dlfn.get());
Expand Down Expand Up @@ -1297,6 +1297,7 @@ class CDFUengine: public CInterface, implements IDFUengine
Owned<INode> foreigndalinode;
StringAttr oldRoxiePrefix;
bool foreigncopy = false;
bool remotecopy = false;
// first check for 'specials' (e.g. multi-cluster keydiff etc)
switch (cmd) {
case DFUcmd_copy:
Expand All @@ -1311,6 +1312,7 @@ class CDFUengine: public CInterface, implements IDFUengine
CDfsLogicalFileName srclfn;
if (tmp.length())
srclfn.set(tmp.str());
remotecopy = srclfn.isRemote();
destination->getLogicalName(tmp.clear());
CDfsLogicalFileName dstlfn;
if (tmp.length())
Expand Down Expand Up @@ -1363,7 +1365,7 @@ class CDFUengine: public CInterface, implements IDFUengine
}
srcFile.setown(wsdfs::lookup(tmp.str(),userdesc,
(cmd==DFUcmd_move)||(cmd==DFUcmd_rename)||((cmd==DFUcmd_copy)&&multiclusterinsert) ? AccessMode::tbdWrite : AccessMode::tbdRead,
false,false,nullptr,true, INFINITE));
false,false,nullptr,true, INFINITE,false));

if (!srcFile)
throw MakeStringException(-1,"Source file %s could not be found",tmp.str());
Expand Down Expand Up @@ -1526,7 +1528,7 @@ class CDFUengine: public CInterface, implements IDFUengine
}
else if (multiclustermerge)
{
dstFile.setown(wsdfs::lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE));
dstFile.setown(wsdfs::lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE,false));
if (!dstFile)
throw MakeStringException(-1,"Destination for merge %s does not exist",tmp.str());
StringBuffer err;
Expand All @@ -1535,7 +1537,7 @@ class CDFUengine: public CInterface, implements IDFUengine
}
else
{
Owned<IDistributedFile> oldfile = wsdfs::lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE);
Owned<IDistributedFile> oldfile = wsdfs::lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE,false);
if (oldfile)
{
StringBuffer reason;
Expand Down Expand Up @@ -1666,7 +1668,7 @@ class CDFUengine: public CInterface, implements IDFUengine
Audit("COPYDIFF",userdesc,srcName.get(),dstName.get());
}
}
else if (foreigncopy||auxfdesc)
else if (remotecopy||foreigncopy||auxfdesc)
{
IFileDescriptor * srcDesc = (auxfdesc.get() ? auxfdesc.get() : srcFdesc.get());
fsys.import(srcDesc, dstFile, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
Expand Down Expand Up @@ -1741,7 +1743,7 @@ class CDFUengine: public CInterface, implements IDFUengine
destination->getLogicalName(toname);
if (toname.length()) {
unsigned start = msTick();
Owned<IDistributedFile> newfile = wsdfs::lookup(toname.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE);
Owned<IDistributedFile> newfile = wsdfs::lookup(toname.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE,false);
if (newfile) {
// check for rename into multicluster
CDfsLogicalFileName dstlfn;
Expand Down
97 changes: 69 additions & 28 deletions dali/dfu/dfuutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "rmtfile.hpp"
#include "dfuutil.hpp"

#include "ws_dfsclient.hpp"

// savemap
// superkey functions
// (logical) directory functions
Expand Down Expand Up @@ -129,6 +131,7 @@ class CFileCloner
{
public:
StringAttr nameprefix;
StringAttr remoteStorage;
Owned<INode> foreigndalinode;
Linked<IUserDescriptor> userdesc;
Linked<IUserDescriptor> foreignuserdesc;
Expand Down Expand Up @@ -409,33 +412,41 @@ class CFileCloner
void updateCloneFrom(const char *lfn, IPropertyTree &attrs, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
{
DBGLOG("updateCloneFrom %s", lfn);
if (!srcdali || srcdali->endpoint().isNull())
if (remoteStorage.isEmpty() && (!srcdali || srcdali->endpoint().isNull()))
attrs.setProp("@cloneFromPeerCluster", srcCluster);
else
{
while(attrs.removeProp("cloneFromGroup"));

StringBuffer s;
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(s).str());
if (!remoteStorage.isEmpty())
{
attrs.setProp("@cloneRemote", remoteStorage.str());
if (!isEmptyString(srcCluster))
attrs.setProp("@cloneRemoteCluster", srcCluster);
}
else
{
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(s).str());
unsigned numClusters = srcfdesc->numClusters();
for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++)
{
StringBuffer sourceGroup;
srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL);
if (srcCluster && *srcCluster && !streq(sourceGroup, srcCluster))
continue;
Owned<IPropertyTree> groupInfo = createPTree("cloneFromGroup");
groupInfo->setProp("@groupName", sourceGroup);
ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum);
spec.toProp(groupInfo);
attrs.addPropTree("cloneFromGroup", groupInfo.getClear());
}
}
attrs.setProp("@cloneFromDir", srcfdesc->queryDefaultDir());
if (srcCluster && *srcCluster) //where to copy from has been explicity set to a remote location, don't copy from local sources
attrs.setProp("@cloneFromPeerCluster", "-");
if (prefix.length())
attrs.setProp("@cloneFromPrefix", prefix.get());

while(attrs.removeProp("cloneFromGroup"));

unsigned numClusters = srcfdesc->numClusters();
for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++)
{
StringBuffer sourceGroup;
srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL);
if (srcCluster && *srcCluster && !streq(sourceGroup, srcCluster))
continue;
Owned<IPropertyTree> groupInfo = createPTree("cloneFromGroup");
groupInfo->setProp("@groupName", sourceGroup);
ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum);
spec.toProp(groupInfo);
attrs.addPropTree("cloneFromGroup", groupInfo.getClear());
}
}
}
void updateCloneFrom(IDistributedFile *dfile, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
Expand Down Expand Up @@ -570,6 +581,7 @@ class CFileCloner
const char *_cluster2,
IUserDescriptor *_userdesc,
const char *_foreigndali,
const char *_remoteStorage,
IUserDescriptor *_foreignuserdesc,
const char *_nameprefix,
bool _overwrite,
Expand All @@ -590,6 +602,8 @@ class CFileCloner
level = 0;
if (_foreigndali&&*_foreigndali)
foreigndalinode.setown(createINode(_foreigndali,DALI_SERVER_PORT));
if (_remoteStorage && *_remoteStorage)
remoteStorage.set(_remoteStorage);
fdir = &queryDistributedFileDirectory();
switch(_clustmap) {
case DFUcpdm_c_replicated_by_d:
Expand Down Expand Up @@ -833,6 +847,8 @@ class CFileCloner
else
{
StringBuffer s;
if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneRemote"), remoteStorage.str()))
return true;
if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFrom"), srcdali->endpoint().getEndpointHostText(s).str()))
return true;
if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromDir"), srcfdesc->queryDefaultDir()))
Expand Down Expand Up @@ -876,20 +892,44 @@ class CFileCloner
srcLFN.clearForeign();
srcdali.setown(createINode(ep));
}

StringBuffer s;
Owned<IPropertyTree> ftree = fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, GetFileTreeOpts::appendForeign);
if (!ftree.get())
throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
IPropertyTree *attsrc = ftree->queryPropTree("Attr");
if (!attsrc)
throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
Owned<IPropertyTree> ftree;
Owned<IPropertyTree> filePlane;
IPropertyTree *attsrc = nullptr;
if (remoteStorage || srcLFN.isRemote())
{
StringBuffer remoteLFN;
if (!srcLFN.isRemote())
remoteLFN.append("remote::").append(remoteStorage).append("::");
srcLFN.get(remoteLFN);

Owned<wsdfs::IDFSFile> dfsFile = wsdfs::lookupDFSFile(remoteLFN.str(), AccessMode::readSequential, INFINITE, wsdfs::keepAliveExpiryFrequency, foreignuserdesc);
IPropertyTree *tree = (dfsFile) ? dfsFile->queryFileMeta() : nullptr;
if (tree)
ftree.setown(tree->getPropTree("File"));
if (!ftree.get())
throw MakeStringException(-1,"Source file %s could not be found in Remote Storage", remoteLFN.str()); //remote scope already included in remoteLFN
const char *remotePlaneName = ftree->queryProp("@group");
VStringBuffer planeXPath("planes[@name=\"%s\"]", remotePlaneName);
filePlane.set(dfsFile->queryCommonMeta()->queryPropTree(planeXPath));
}
else
{
ftree.setown(fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, GetFileTreeOpts::appendForeign));
if (!ftree.get())
throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
attsrc = ftree->queryPropTree("Attr");
if (!attsrc)
throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
}

CDfsLogicalFileName dlfn;
dlfn.set(destfilename);
if (!streq(ftree->queryName(),queryDfsXmlBranchName(DXB_File)))
throw MakeStringException(-1,"Source file %s in Dali %s is not a simple file",filename, getDaliEndPointStr(srcdali, s));

if (!srcdali.get()||queryCoven().inCoven(srcdali))
if (!remoteStorage.length() && (!srcdali.get() || queryCoven().inCoven(srcdali)))
{
// if dali is local and filenames same
if (streq(srcLFN.get(), dlfn.get()))
Expand Down Expand Up @@ -1241,7 +1281,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
)
{
CFileCloner cloner;
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,nameprefix,overwrite,dophysicalcopy);
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,nullptr,foreignuserdesc,nameprefix,overwrite,dophysicalcopy);
CDfsLogicalFileName dlfn;
cloner.cloneSuperFile(srcname,dlfn);
}
Expand All @@ -1263,7 +1303,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
{
DBGLOG("createSingleFileClone src=%s@%s, dst=%s@%s, prefix=%s, ow=%d, docopy=%d", srcname, srcCluster, dstname, cluster1, prefix, overwrite, dophysicalcopy);
CFileCloner cloner;
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,NULL,overwrite,dophysicalcopy);
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,nullptr,foreignuserdesc,NULL,overwrite,dophysicalcopy);
cloner.srcCluster.set(srcCluster);
cloner.prefix.set(prefix);
cloner.cloneFile(srcname,dstname);
Expand All @@ -1280,6 +1320,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
const char *defReplicateFolder,
IUserDescriptor *userdesc, // user desc for local dali
const char *foreigndali, // can be omitted if srcname foreign or local
const char *remoteStorage, // can be omitted if srcname remote or local
unsigned overwriteFlags, // overwrite destination if exists
bool dophysicalcopy
)
Expand All @@ -1288,7 +1329,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
CFileCloner cloner;
// MORE: Would the following be better to ensure files are copied when queries are deployed?
// bool copyPhysical = isContainerized() && (foreigndali != nullptr);
cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, NULL, NULL, false, dophysicalcopy);
cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, remoteStorage, NULL, NULL, false, dophysicalcopy);
cloner.overwriteFlags = overwriteFlags;
#ifndef _CONTAINERIZED
//In containerized mode there is no need to replicate files to the local disks of the roxie cluster - so don't set the special flag
Expand Down
Loading

0 comments on commit d048f94

Please sign in to comment.